/* Filename : BoundedQueue.cvl Authors : YYY and XXX Created : 2024-06-17 Modified : 2025-01-17 CIVL model of BoundedQueue class from "The Art of Multiprocessor Programming" 2nd ed, by Herlihy, Luchangco, Shavit, and Spear, Sec. 10.3 "An bounded partial queue", and companion code ch10/Queue/src/queue/BoundedQueue.java. This is a simple bounded queue implementation using two conditions. some Lab some division some institution */ #include "AtomicInteger.h" #include "Condition_dl.h" #include "Lock.h" #include "Queue.h" #include "tid.h" #include "types.h" #include #include #include #ifndef CAPACITY #define CAPACITY 2 #endif typedef struct Node { T value; struct Node * next; } * Node; static Node Node_create(T value) { Node new_node = malloc(sizeof(struct Node)); new_node->value = value; new_node->next = NULL; return new_node; } static void Node_destroy(Node node) { free(node); } struct Queue { Lock enqLock; Lock deqLock; Condition notEmptyCondition; Condition notFullCondition; Node head; Node tail; AtomicInteger size; // size is supposedly the number of empty slots, but the code in the // reference implementation is inconsistent. In the book it is // initialized to 0, so it acts as the number of elements. The // reference code treats it as the number of slots. int capacity; // max number of elements }; Queue Queue_create() { Queue this = malloc(sizeof(struct Queue)); this->capacity = CAPACITY; this->head = Node_create(-1); this->tail = this->head; this->size = AtomicInteger_create(0); this->enqLock = Lock_create(); this->notFullCondition = Condition_create(this->enqLock); this->deqLock = Lock_create(); this->notEmptyCondition = Condition_create(this->deqLock); return this; } // not in original code... void Queue_destroy(Queue queue) { Node curr = queue->head; while (curr) { Node temp = curr; curr = curr->next; Node_destroy(temp); } AtomicInteger_destroy(queue->size); Lock_destroy(queue->enqLock); Condition_destroy(queue->notFullCondition); Lock_destroy(queue->deqLock); Condition_destroy(queue->notEmptyCondition); free(queue); } void Queue_initialize_context(void) { Condition_init_context(); } void Queue_finalize_context(void) { Condition_finalize_context(); } void Queue_initialize(Queue queue) { Condition_initialize(queue->notEmptyCondition); Condition_initialize(queue->notFullCondition); } void Queue_finalize(Queue queue) { Condition_finalize(queue->notEmptyCondition); Condition_finalize(queue->notFullCondition); } void Queue_terminate(int tid) { Condition_terminate(tid); } bool Queue_stuck() { return Condition_isDeadlocked(); } T Queue_deq(Queue this) { T result; bool mustWakeEnqueuers = false; Lock_acquire(this->deqLock); // try... while (this->head->next == NULL) { Condition_await(this->notEmptyCondition); if (Condition_isDeadlocked()) { Lock_release(this->deqLock); return -1; } } Node old_head = this->head; result = this->head->next->value; this->head = this->head->next; Node_destroy(old_head); if (AtomicInteger_getAndDecrement(this->size) == this->capacity) { mustWakeEnqueuers = true; } // finally... Lock_release(this->deqLock); if (mustWakeEnqueuers) { Lock_acquire(this->enqLock); // try... Condition_signalAll(this->notFullCondition); // finally... //$print("Thread releasing enqlock\n"); Lock_release(this->enqLock); } return result; } void Queue_enq(Queue this, T x) { bool mustWakeDequeuers = false; Node e = Node_create(x); Lock_acquire(this->enqLock); // try... while (AtomicInteger_get(this->size) == this->capacity) { Condition_await(this->notFullCondition); if (Condition_isDeadlocked()) { Lock_release(this->enqLock); return; } } this->tail->next = e; this->tail = e; if (AtomicInteger_getAndIncrement(this->size) == 0) mustWakeDequeuers = true; // finally... //$print("Thread releasing enqlock\n"); Lock_release(this->enqLock); if (mustWakeDequeuers) { Lock_acquire(this->deqLock); // try... Condition_signalAll(this->notEmptyCondition); // finally... //$print("Thread releasing deqlock\n"); Lock_release(this->deqLock); } } // not in original code... void Queue_print(Queue this) { Node curr = this->head->next; $print("{ "); while (curr) { $print(curr->value, " "); curr = curr->next; } $print("}"); } #ifdef _BOUNDED_QUEUE_MAIN int main(void) { Queue q = Queue_create(); int nthread = 2; // 3 is OK with time tid_init(nthread); Queue_initialize_context(); Queue_initialize(q); $print("Starting...\n"); $parfor (int i: 0..nthread-1) { tid_register(i); Queue_enq(q, i); if (Queue_stuck()) goto END; int x = Queue_deq(q); if (Queue_stuck()) goto END; $assert(x >= 0); $assert(x < nthread); Queue_terminate(i); END: tid_unregister(); } $assert(!Queue_stuck(), "Stuck execution!\n"); Queue_finalize(q); Queue_finalize_context(); tid_finalize(); $print("Done: "); Queue_print(q); $print("\n"); Queue_destroy(q); } #endif