/* Filename : SkipQueue.cvl Authors : YYY and XXX Created : 2021-03-02 Modified : 2025-01-16 CIVL model of SkipQueue class from "The Art of Multiprocessor Programming" 2nd ed, by Herlihy, Luchangco, Shavit, and Spear, Sec. 15.5 "A skiplist-based unbounded priority queue", and companion code ch15/priority/src/priority/SkipQueue.java. This is a lock-free priority queue implemented with a PrioritySkipList. This model contains an optional original mode to reproduce a bug, obtained using -D_ORIGINAL_SKIPQUEUE. some Lab some division some institution */ #include "AtomicBoolean.cvh" #include "AtomicMarkableReference.cvh" #include "PQueue.h" #include "tid.h" #include "types.h" #include #include #include #include #include #define ROOT 1 #define NO_ONE $proc_null #define CAPACITY 8 #define MAX_LEVEL 1 // Node class... typedef struct Node { T item; int priority; AtomicBoolean marked; AtomicMarkableReference* next; int next_length; int topLevel; } * Node; Node Node_create_sentinel(int myPriority) { Node result = malloc(sizeof(struct Node)); result->item = -1; result->priority = myPriority; result->marked = AtomicBoolean_create(false); result->next_length = MAX_LEVEL + 1; result->next = malloc(sizeof(AtomicMarkableReference) * result->next_length); for (int i = 0; i < result->next_length; i++) { result->next[i] = AtomicMarkableReference_create(NULL, false); } result->topLevel = MAX_LEVEL; return result; } Node Node_create(T x, int myPriority) { Node result = malloc(sizeof(struct Node)); result->item = x; result->priority = myPriority; result->marked = AtomicBoolean_create(false); int height = $choose_int(MAX_LEVEL); result->next_length = MAX_LEVEL + 1; result->next = malloc(sizeof(AtomicMarkableReference) * result->next_length); for (int i = 0; i < result->next_length; i++) { result->next[i] = AtomicMarkableReference_create(NULL, false); } result->topLevel = MAX_LEVEL; return result; } void Node_destroy(Node n) { for (int i = 0; i < n->next_length; i++) { AtomicMarkableReference_destroy(n->next[i]); } free(n->next); free(n); } // PrioritySkipList class ... typedef struct PrioritySkipList { Node head; Node tail; } * PrioritySkipList; PrioritySkipList PrioritySkipList_create() { PrioritySkipList p = malloc(sizeof(struct PrioritySkipList)); p->head = Node_create_sentinel(INT_MIN); p->tail = Node_create_sentinel(INT_MAX); for (int i = 0; i < p->head->next_length; i++) { if (p->head->next[i]) { AtomicMarkableReference_destroy(p->head->next[i]); } p->head->next[i] = AtomicMarkableReference_create((void*) p->tail, false); } return p; } void PrioritySkipList_destroy(PrioritySkipList p) { Node curr = AtomicMarkableReference_getReference(p->head->next[0]); while (curr != p->tail) { Node next = AtomicMarkableReference_getReference(curr->next[0]); Node_destroy(curr); curr = next; } Node_destroy(p->head); Node_destroy(p->tail); free(p); } $atomic_f void PrioritySkipList_print(PrioritySkipList p) { $print("{ "); Node curr = AtomicMarkableReference_getReference(p->head->next[0]); while (curr != p->tail) { $print("(", curr->item, ",", curr->priority, ") "); curr = AtomicMarkableReference_getReference(curr->next[0]); } $print("}\n"); } static bool find(PrioritySkipList p, Node node, Node* preds, Node* succs); /* [Note: this is the local add method for the PrioritySkipList class, NOT PQueue_add.] Add at bottomLevel and thus to the set. Afterwords all links at higher levels are added. */ static bool PrioritySkipList_add(PrioritySkipList p, Node node) { int bottomLevel = 0; Node* preds = malloc(sizeof(Node) * (MAX_LEVEL + 1)); Node* succs = malloc(sizeof(Node) * (MAX_LEVEL + 1)); while (true) { bool found = find(p, node, preds, succs); if (found) { // if found it's not marked free(preds); free(succs); return false; } else { for (int level = bottomLevel; level <= node->topLevel; level++) { Node succ = succs[level]; AtomicMarkableReference_set(node->next[level], succ, false); } // try to splice in new node in bottomLevel going up Node pred = preds[bottomLevel]; Node succ = succs[bottomLevel]; AtomicMarkableReference_set(node->next[bottomLevel], succ, false); if (!AtomicMarkableReference_compareAndSet (pred->next[bottomLevel], succ, node, false, false)) { // lin point continue; // retry from start } // splice in remaining levels going up for (int level = bottomLevel + 1; level <= node->topLevel; level++) { while (true) { pred = preds[level]; succ = succs[level]; if (AtomicMarkableReference_compareAndSet (pred->next[level], succ, node, false, false)) break; find(p, node, preds, succs); // find new preds and succs } } free(preds); free(succs); return true; } } } /* [Note: this is the local remove method for the PrioritySkipList class, NOT PQueue_removeMin] start at highest level then continue marking down the levels if lowest marked successfully node is removed other threads could be modifying node's pointers concurrently the node could also still be in the process of being added so node could end up connected on some levels and disconnected on others find traversals will eventually physically remove node */ static bool PrioritySkipList_remove(PrioritySkipList p, Node node) { int bottomLevel = 0; Node* preds = malloc(sizeof(Node) * (MAX_LEVEL + 1)); Node* succs = malloc(sizeof(Node) * (MAX_LEVEL + 1)); Node succ; while (true) { bool found = find(p, node, preds, succs); if (!found) { return false; } else { // proceed to mark all levels // some levels could stil be unthreaded by concurrent // add() while being marked // other find()s could be modifying node's pointers concurrently for (int level = node->topLevel; level >= bottomLevel + 1; level--) { bool marked = false; succ = AtomicMarkableReference_get(node->next[level], &marked); while (!marked) { // until I succeed in marking AtomicMarkableReference_attemptMark(node->next[level], succ, true); succ = AtomicMarkableReference_get(node->next[level], &marked); } } // proceed to remove from bottom level bool marked = false; succ = AtomicMarkableReference_get(node->next[bottomLevel], &marked); while (true) { // until someone succeeded in marking bool iMarkedIt = AtomicMarkableReference_compareAndSet (node->next[bottomLevel], succ, succ, false, true); succ = AtomicMarkableReference_get (succs[bottomLevel]->next[bottomLevel], &marked); if (iMarkedIt) { // run find to remove links of the logically removed node find(p, node, preds, succs); return true; } else if (marked) return false; // someone else removed node // else only succ changed so repeat } } } } static Node findAndMarkMin(PrioritySkipList p) { Node curr = NULL; Node succ = NULL; curr = AtomicMarkableReference_getReference(p->head->next[0]); while (curr != p->tail) { if (!AtomicBoolean_get(curr->marked)) { if (AtomicBoolean_compareAndSet(curr->marked, false, true)) { return curr; } else { curr = AtomicMarkableReference_getReference(curr->next[0]); } #ifndef _ORIGINAL_SKIPQUEUE } else { curr = AtomicMarkableReference_getReference(curr->next[0]); #endif } } return NULL; // no unmarked nodes } /* Finds node preds and succs and cleans up and does not traverse marked nodes. Found means node with equal key reached at bottom level. This differs from lazy list and allow wait-free contains since new nodes are always inserted before removed ones and will be found at bottom level so if a marked node found at bottom level then there is no node with same value in the set. This means that remove cannot start until node is threaded by add() at the bottomLevel. */ static bool find(PrioritySkipList p, Node node, Node* preds, Node* succs) { int bottomLevel = 0; bool marked = false; bool snip; Node pred = NULL; Node curr = NULL; Node succ = NULL; retry: while (true) { pred = p->head; for (int level = MAX_LEVEL; level >= bottomLevel; level--) { curr = AtomicMarkableReference_getReference(pred->next[level]); while (true) { succ = AtomicMarkableReference_get(curr->next[level], &marked); while (marked) { // replace curr if marked snip = AtomicMarkableReference_compareAndSet (pred->next[level], curr, succ, false, false); if (!snip) goto retry; curr = AtomicMarkableReference_getReference(pred->next[level]); succ = AtomicMarkableReference_get(curr->next[level], &marked); } if (curr->priority < node->priority) { // move forward same level pred = curr; curr = succ; } else { break; // move to next level } } preds[level] = pred; succs[level] = curr; } return (curr->priority == node->priority); // bottom level curr.key == v } } // SkipQueue class ... struct PQueue { PrioritySkipList skiplist; }; PQueue PQueue_create() { PQueue result = malloc(sizeof(struct PQueue)); result->skiplist = PrioritySkipList_create(); return result; } void PQueue_destroy(PQueue pq) { PrioritySkipList_destroy(pq->skiplist); free(pq); } void PQueue_initialize_context(void) {} void PQueue_finalize_context(void) {} void PQueue_initialize(PQueue pq) {} void PQueue_finalize(PQueue pq) {} void PQueue_terminate(int tid) {} bool PQueue_stuck(void) { return false; } void PQueue_add(PQueue q, T item, int priority) { Node node = Node_create(item, priority); PrioritySkipList_add(q->skiplist, node); } T PQueue_removeMin(PQueue q) { Node node = findAndMarkMin(q->skiplist); if (node != NULL) { PrioritySkipList_remove(q->skiplist, node); return node->item; } else { return -1; } } void PQueue_print(PQueue q) { $print("{ "); Node curr = AtomicMarkableReference_getReference(q->skiplist->head->next[0]); while (curr != q->skiplist->tail) { $print("(", curr->item, ",", curr->priority, ") "); curr = AtomicMarkableReference_getReference(curr->next[0]); } $print("}"); } #ifdef _SKIP_QUEUE_MAIN static void startup(PQueue pq, int nproc) { tid_init(nproc); PQueue_initialize_context(); PQueue_initialize(pq); } static void shutdown(PQueue pq) { PQueue_finalize(pq); PQueue_finalize_context(); tid_finalize(); } int main() { PQueue pq = PQueue_create(); int N = 2; startup(pq, N); $parfor(int i : 0 .. N - 1) { tid_register(i); PQueue_add(pq, i, i); PQueue_terminate(i); tid_unregister(); } $print("PQueue after adds: "); PQueue_print(pq); $print("\n"); shutdown(pq); int result[N]; startup(pq, N); $parfor(int i : 0 .. N - 1) { tid_register(i); result[i] = PQueue_removeMin(pq); PQueue_terminate(i); tid_unregister(); } $print("PQueue after removes: "); PQueue_print(pq); $print("\n"); shutdown(pq); // could use forall exists to do this instead of checksum int checkSum = 0; $for(int i : 0 .. N - 1) checkSum += result[i]; $assert(checkSum == (N * (N - 1)) / 2); PQueue_destroy(pq); } #endif