/* Filename : SynchronousDualQueue1.cvl Author : XXX Created : 2024-12-21 Modified : 2025-01-17 Synchronous dual queue, based on SynchronousDualQueue class, section 10.7 of "The Art of Multiprocessor Programming" 2nd ed, by Herlihy, Luchangco, Shavit, and Spear. This is the simple model. It will get into a non-terminating cycle for an unbalanced schedule, just as the real code will. some Lab some division some institution */ #include "AtomicInteger.h" #include "AtomicReference.cvh" #include "Queue.h" #include "tid.h" #include "types.h" #include #include // Nodes... typedef enum NodeType {ITEM, RESERVATION} NodeType; typedef struct Node { NodeType type; AtomicInteger item; AtomicReference next; } * Node; static Node Node_create(T myItem, NodeType myType) { Node this = malloc(sizeof(struct Node)); this->item = AtomicInteger_create(myItem); this->next = AtomicReference_create(NULL); this->type = myType; return this; } // not in original code... static void Node_destroy(Node this) { AtomicReference_destroy(this->next); AtomicInteger_destroy(this->item); free(this); } // Queues... struct Queue { AtomicReference head; AtomicReference tail; }; Queue Queue_create() { Queue this = malloc(sizeof(struct Queue)); Node sentinel = Node_create(-1, ITEM); // new Node(null, ITEM) this->head = AtomicReference_create(sentinel); this->tail = AtomicReference_create(sentinel); return this; } // not in original code... void Queue_destroy(Queue queue) { // if node is not null, its next is not null Node curr = AtomicReference_get(queue->head); Node prev = NULL; while (curr != NULL) { prev = curr; curr = AtomicReference_get(curr->next); Node_destroy(prev); } AtomicReference_destroy(queue->head); AtomicReference_destroy(queue->tail); free(queue); } void Queue_initialize_context(void) {} void Queue_finalize_context(void) {} void Queue_initialize(Queue queue) {} void Queue_finalize(Queue queue) {} void Queue_terminate(int tid) {} bool Queue_stuck() { return false; } void Queue_enq(Queue this, T e) { Node offer = Node_create(e, ITEM); while (true) { Node t = AtomicReference_get(this->tail), h = AtomicReference_get(this->head); if (h == t || t->type == ITEM) { Node n = AtomicReference_get(t->next); if (t == AtomicReference_get(this->tail)) { if (n != NULL) { AtomicReference_compareAndSet(this->tail, t, n); } else if (AtomicReference_compareAndSet(t->next, n, offer)) { AtomicReference_compareAndSet(this->tail, t, offer); $when(offer->item->value != e); h = AtomicReference_get(this->head); if (offer == AtomicReference_get(h->next)) AtomicReference_compareAndSet(this->head, h, offer); return; } } } else { Node n = AtomicReference_get(h->next); if (t != AtomicReference_get(this->tail) || h != AtomicReference_get(this->head) || n == NULL) { continue; } bool success = AtomicInteger_compareAndSet(n->item, -1, e); AtomicReference_compareAndSet(this->head, h, n); if (success) return; } } // end of while loop } T Queue_deq(Queue this) { Node offer = Node_create(-1, RESERVATION); while (true) { Node t = AtomicReference_get(this->tail), h = AtomicReference_get(this->head); if (h == t || t->type == RESERVATION) { Node n = AtomicReference_get(t->next); if (t == AtomicReference_get(this->tail)) { if (n != NULL) { AtomicReference_compareAndSet(this->tail, t, n); } else if (AtomicReference_compareAndSet(t->next, n, offer)) { AtomicReference_compareAndSet(this->tail, t, offer); $when(offer->item->value != -1); h = AtomicReference_get(this->head); if (offer == AtomicReference_get(h->next)) AtomicReference_compareAndSet(this->head, h, offer); return AtomicInteger_get(offer->item); } } } else { Node n = AtomicReference_get(h->next); if (t != AtomicReference_get(this->tail) || h != AtomicReference_get(this->head) || n == NULL) { continue; } T item = AtomicInteger_get(n->item); // added by YYY to correct defect... if (item == -1) continue; bool success = AtomicInteger_compareAndSet(n->item, item, -1); AtomicReference_compareAndSet(this->head, h, n); if (success) { return item; } } } // end of while loop } void Queue_print(Queue this) { Node prev = AtomicReference_get(this->head), curr = AtomicReference_get(prev->next); $print("{ "); while (curr != NULL) { int val = AtomicInteger_get(curr->item); $print(val); if (curr->type == RESERVATION) $print("[R] "); else $print(" "); prev = curr; curr = AtomicReference_get(curr->next); } $print("}"); } // Tests... #ifdef _SYNCHRONOUS_DUAL_QUEUE static void startup(Queue q, int nproc) { tid_init(nproc); Queue_initialize_context(); Queue_initialize(q); } static void shutdown(Queue q) { Queue_finalize(q); Queue_finalize_context(); tid_finalize(); } /* 2N threads: even ones enqueue, odd ones dequeue. Should never get stuck and should always result in an empty queue. */ void testBalanced(Queue q, int N) { $assert(N>=1); int nthread=2*N; startup(q, nthread); $parfor (int i: 0..nthread-1) { tid_register(i); if (i%2 == 0) Queue_enq(q, i); else { int result = Queue_deq(q); $assert(result >= 0); $assert(result <= nthread-2 && result%2 == 0); } Queue_terminate(i); tid_unregister(); } $print("Done: "); Queue_print(q); $print("\n"); $assert(AtomicReference_get(q->head) == AtomicReference_get(q->tail)); shutdown(q); } /* All threads enqueue. This should always get stuck. */ void testEnqs(Queue q, int nthread) { $assert(nthread >= 1); startup(q, nthread); $parfor (int i: 0..nthread-1) { tid_register(i); Queue_enq(q, i); Queue_terminate(i); tid_unregister(); } // you should never make it here $assert(false, "ERROR: Queue did not deadlock"); } /* Three threads: one enqueues, the other two dequeue. Should get stuck. */ void testEDD(Queue q) { int nthread=3; startup(q, nthread); int ndeqdone = 0; $parfor (int i: 0..nthread-1) { tid_register(i); if (i==0) Queue_enq(q, i); else { int x = Queue_deq(q); $assert(x >= 0); ndeqdone++; $assert(ndeqdone <= 1); } Queue_terminate(i); tid_unregister(); } $assert(false, "ERROR: Queue did not deadlock"); } void testEED(Queue q) { int nthread=3; startup(q, nthread); int nenqdone = 0; $parfor (int i: 0..nthread-1) { tid_register(i); if (i<=1) { Queue_enq(q, i); nenqdone++; $assert(nenqdone <= 1); } else { int x = Queue_deq(q); $assert(x >= 0); } Queue_terminate(i); tid_unregister(); } $assert(false, "ERROR: Queue did not deadlock"); } int main(void) { Queue q = Queue_create(); #ifdef BALANCED1 testBalanced(q, 1); #endif #ifdef BALANCED2 testBalanced(q, 2); #endif #ifdef ENQS1 testEnqs(q, 1); #endif #ifdef ENQS2 testEnqs(q, 2); #endif #ifdef EDD testEDD(q); #endif #ifdef EED testEED(q); #endif Queue_destroy(q); } #endif