/* Filename : SynchronousDualQueue2.cvl Author : XXX Created : 2024-12-21 Modified : 2025-01-17 Synchronous dual queue, based on SynchronousDualQueue class, section 10.7 of "The Art of Multiprocessor Programming" 2nd ed, by Herlihy, Luchangco, Shavit, and Spear. This version has been extended with a mechanism to detect and recover when control gets "stuck" (in a non-progress cycle). Hence every call should return. After returning, the client can invoke Queue_stuck() to determine whether the execution got stuck or terminated normally. some Lab some division some institution */ #include "AtomicInteger.h" #include "AtomicReference.cvh" #include "NPDetector.cvh" #include "Queue.h" #include "tid.h" #include "types.h" #include #include // Nodes... typedef enum NodeType {ITEM, RESERVATION} NodeType; typedef struct Node { NodeType type; AtomicInteger item; AtomicReference next; } * Node; static Node Node_create(T myItem, NodeType myType) { Node this = malloc(sizeof(struct Node)); this->item = AtomicInteger_create(myItem); this->next = AtomicReference_create(NULL); this->type = myType; return this; } // not in original code... static void Node_destroy(Node this) { AtomicReference_destroy(this->next); AtomicInteger_destroy(this->item); free(this); } // Queues... struct Queue { AtomicReference head; AtomicReference tail; }; Queue Queue_create() { Queue this = malloc(sizeof(struct Queue)); Node sentinel = Node_create(-1, ITEM); // new Node(null, ITEM) this->head = AtomicReference_create(sentinel); this->tail = AtomicReference_create(sentinel); return this; } // not in original code... void Queue_destroy(Queue queue) { // if node is not null, its next is not null Node curr = AtomicReference_get(queue->head); Node prev = NULL; while (curr != NULL) { prev = curr; curr = AtomicReference_get(curr->next); Node_destroy(prev); } AtomicReference_destroy(queue->head); AtomicReference_destroy(queue->tail); free(queue); } void Queue_initialize_context(void) { npd_initialize(tid_nthread()); } void Queue_finalize_context(void) { npd_finalize(); } void Queue_initialize(Queue queue) {} void Queue_finalize(Queue queue) {} void Queue_terminate(int tid) { npd_terminate(tid); } bool Queue_stuck() { return npd_stuck(); } bool static ar_compareAndSet(AtomicReference ref, void * expect, void * update) { if (AtomicReference_compareAndSet(ref, expect, update)) { npd_signalAll(); return true; } return false; } bool static ai_compareAndSet(AtomicInteger ref, int expect, int update) { if (AtomicInteger_compareAndSet(ref, expect, update)) { npd_signalAll(); return true; } return false; } /* Spin while *p==val, but breakout if livelock occurs. Returns true iff livelock occurred. */ static bool spin_while_eq(int tid, int * p, int val) { npd_loop_enter(tid); while (*p==val) { npd_loop_top(tid); if (npd_loop_bottom(tid)) return true; } npd_loop_exit(tid); return false; } void Queue_enq(Queue this, T e) { int tid = tid_get(), nthread = tid_nthread(); $assert(0<=tid && tidtail), h = AtomicReference_get(this->head); if (h == t || t->type == ITEM) { Node n = AtomicReference_get(t->next); if (t == AtomicReference_get(this->tail)) { if (n != NULL) { ar_compareAndSet(this->tail, t, n); } else if (ar_compareAndSet(t->next, n, offer)) { ar_compareAndSet(this->tail, t, offer); if (spin_while_eq(tid, &offer->item->value, e)) return; // stuck h = AtomicReference_get(this->head); if (offer == AtomicReference_get(h->next)) ar_compareAndSet(this->head, h, offer); npd_loop_exit(tid); return; } } } else { Node n = AtomicReference_get(h->next); if (t != AtomicReference_get(this->tail) || h != AtomicReference_get(this->head) || n == NULL) { if (npd_loop_bottom(tid)) return; continue; } bool success = ai_compareAndSet(n->item, -1, e); ar_compareAndSet(this->head, h, n); if (success) { npd_loop_exit(tid); return; } } if (npd_loop_bottom(tid)) return; } // end of while loop } T Queue_deq(Queue this) { int tid = tid_get(), nthread = tid_nthread(); $assert(0<=tid && tidtail), h = AtomicReference_get(this->head); if (h == t || t->type == RESERVATION) { Node n = AtomicReference_get(t->next); if (t == AtomicReference_get(this->tail)) { if (n != NULL) { ar_compareAndSet(this->tail, t, n); } else if (ar_compareAndSet(t->next, n, offer)) { ar_compareAndSet(this->tail, t, offer); if (spin_while_eq(tid, &offer->item->value, -1)) return -1; // stuck h = AtomicReference_get(this->head); if (offer == AtomicReference_get(h->next)) ar_compareAndSet(this->head, h, offer); npd_loop_exit(tid); return AtomicInteger_get(offer->item); } } } else { Node n = AtomicReference_get(h->next); if (t != AtomicReference_get(this->tail) || h != AtomicReference_get(this->head) || n == NULL) { if (npd_loop_bottom(tid)) return -1; continue; } T item = AtomicInteger_get(n->item); #ifdef _PATCH // added by YYY to correct defect... if (item == -1) { if (npd_loop_bottom(tid)) return -1; continue; } #endif bool success = ai_compareAndSet(n->item, item, -1); ar_compareAndSet(this->head, h, n); if (success) { npd_loop_exit(tid); return item; } } if (npd_loop_bottom(tid)) return -1; } // end of while loop } void Queue_print(Queue this) { Node prev = AtomicReference_get(this->head), curr = AtomicReference_get(prev->next); $print("{ "); while (curr != NULL) { int val = AtomicInteger_get(curr->item); $print(val); if (curr->type == RESERVATION) $print("[R] "); else $print(" "); prev = curr; curr = AtomicReference_get(curr->next); } $print("}"); } // Tests... #ifdef _SYNCHRONOUS_DUAL_QUEUE static void startup(Queue q, int nproc) { tid_init(nproc); Queue_initialize_context(); Queue_initialize(q); } static void shutdown(Queue q) { Queue_finalize(q); Queue_finalize_context(); tid_finalize(); } /* 2N threads: even ones enqueue, odd ones dequeue. Should never get stuck and should always result in an empty queue. */ void testBalanced(Queue q, int N) { $assert(N>=1); int nthread=2*N; startup(q, nthread); $parfor (int i: 0..nthread-1) { tid_register(i); if (i%2 == 0) { Queue_enq(q, i); } else { int result = Queue_deq(q); $assert(result >= 0 && result<=2*nthread-2 && result%2==0); } if (!Queue_stuck()) Queue_terminate(i); tid_unregister(); } $print("Done: "); Queue_print(q); $print("\n"); $assert(AtomicReference_get(q->head) == AtomicReference_get(q->tail)); $assert(!Queue_stuck()); shutdown(q); } /* All threads enqueue. This should always get stuck. */ void testEnqs(Queue q, int nthread) { $assert(nthread >= 1); startup(q, nthread); $parfor (int i: 0..nthread-1) { tid_register(i); Queue_enq(q, i); if (!Queue_stuck()) Queue_terminate(i); tid_unregister(); } $print("Done: "); Queue_print(q); $print("\n"); $assert(Queue_stuck()); shutdown(q); } /* Three threads: one enqueues, the other two dequeue. Should get stuck. */ void testEDD(Queue q) { int nthread=3; startup(q, nthread); int ndeqdone = 0; $parfor (int i: 0..nthread-1) { tid_register(i); if (i==0) { Queue_enq(q, i); $assert(!Queue_stuck()); Queue_terminate(i); } else { int x = Queue_deq(q); if (!Queue_stuck()) { $assert(x>=0); ndeqdone++; $assert(ndeqdone <= 1); Queue_terminate(i); } } tid_unregister(); } $assert(Queue_stuck()); shutdown(q); } void testEED(Queue q) { int nthread=3; startup(q, 3); int nenqdone = 0; $parfor (int i: 0..nthread-1) { tid_register(i); if (i<=1) { Queue_enq(q, i); if (!Queue_stuck()) { nenqdone++; $assert(nenqdone <= 1); Queue_terminate(i); } } else { int x = Queue_deq(q); $assert(!Queue_stuck()); $assert(x >= 0); Queue_terminate(i); } tid_unregister(); } $assert(Queue_stuck()); shutdown(q); } int main() { Queue q = Queue_create(); #ifdef BALANCED1 testBalanced(q, 1); #endif #ifdef BALANCED2 testBalanced(q, 2); #endif #ifdef ENQS1 testEnqs(q, 1); #endif #ifdef ENQS2 testEnqs(q, 2); #endif #ifdef EDD testEDD(q); #endif #ifdef EED testEED(q); #endif Queue_destroy(q); } #endif