לדלג לתוכן

12.2 מבני נתונים ללא נעילה פתרון

פתרון לתרגיל 1 - מונה אטומי עם CAS

#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <pthread.h>

typedef struct {
    _Atomic int value;
} atomic_counter_t;

void counter_init(atomic_counter_t *c, int initial) {
    atomic_store(&c->value, initial);
}

void counter_increment(atomic_counter_t *c) {
    int old = atomic_load(&c->value);
    while (!atomic_compare_exchange_weak(&c->value, &old, old + 1)) {
        // old מתעדכן אוטומטית לערך הנוכחי, ננסה שוב
    }
}

void counter_add(atomic_counter_t *c, int amount) {
    int old = atomic_load(&c->value);
    while (!atomic_compare_exchange_weak(&c->value, &old, old + amount)) {
        // retry
    }
}

int counter_get(atomic_counter_t *c) {
    return atomic_load(&c->value);
}

#define NUM_THREADS 8
#define OPS_PER_THREAD 100000

atomic_counter_t counter;

void *increment_thread(void *arg) {
    (void)arg;
    for (int i = 0; i < OPS_PER_THREAD; i++) {
        counter_increment(&counter);
    }
    return NULL;
}

int main(void) {
    counter_init(&counter, 0);

    pthread_t threads[NUM_THREADS];
    for (int i = 0; i < NUM_THREADS; i++) {
        pthread_create(&threads[i], NULL, increment_thread, NULL);
    }
    for (int i = 0; i < NUM_THREADS; i++) {
        pthread_join(threads[i], NULL);
    }

    int result = counter_get(&counter);
    printf("expected: %d\n", NUM_THREADS * OPS_PER_THREAD);
    printf("got:      %d\n", result);
    printf("correct:  %s\n", result == NUM_THREADS * OPS_PER_THREAD ? "yes" : "NO!");

    return 0;
}

קומפילציה:

gcc -O2 -o counter counter.c -lpthread -latomic


פתרון לתרגיל 2 - מחסנית Treiber

#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <pthread.h>

typedef struct node {
    int data;
    struct node *next;
} node_t;

typedef struct {
    _Atomic(node_t *) top;
} treiber_stack_t;

void stack_init(treiber_stack_t *s) {
    atomic_store(&s->top, NULL);
}

void stack_push(treiber_stack_t *s, int value) {
    node_t *new_node = malloc(sizeof(node_t));
    new_node->data = value;

    node_t *old_top = atomic_load(&s->top);
    do {
        new_node->next = old_top;
    } while (!atomic_compare_exchange_weak(&s->top, &old_top, new_node));
}

// מחזיר 1 אם הצליח, 0 אם ריק
int stack_pop(treiber_stack_t *s, int *value) {
    node_t *old_top = atomic_load(&s->top);
    do {
        if (old_top == NULL) {
            return 0;
        }
    } while (!atomic_compare_exchange_weak(&s->top, &old_top, old_top->next));

    *value = old_top->data;
    // הערה: בתוכנית אמיתית, free כאן מסוכן בגלל ABA
    // לצורך התרגיל זה בסדר כי הpop threads שלנו לא מתערבבים עם push
    free(old_top);
    return 1;
}

#define PUSH_THREADS 4
#define POP_THREADS 4
#define OPS_PER_THREAD 10000

treiber_stack_t stack;
_Atomic int total_popped = 0;

void *push_thread(void *arg) {
    int id = *(int *)arg;
    for (int i = 0; i < OPS_PER_THREAD; i++) {
        stack_push(&stack, id * OPS_PER_THREAD + i);
    }
    printf("push thread %d: pushed %d items\n", id, OPS_PER_THREAD);
    return NULL;
}

void *pop_thread(void *arg) {
    int id = *(int *)arg;
    int count = 0;
    int value;

    // נסה לעשות pop עד שנצליח OPS_PER_THREAD פעמים
    while (count < OPS_PER_THREAD) {
        if (stack_pop(&stack, &value)) {
            count++;
        }
    }

    atomic_fetch_add(&total_popped, count);
    printf("pop thread %d: popped %d items\n", id, count);
    return NULL;
}

int main(void) {
    stack_init(&stack);

    pthread_t push_threads[PUSH_THREADS], pop_threads[POP_THREADS];
    int push_ids[PUSH_THREADS], pop_ids[POP_THREADS];

    // הפעל push threads קודם כדי שיהיה מה לקרוא
    for (int i = 0; i < PUSH_THREADS; i++) {
        push_ids[i] = i;
        pthread_create(&push_threads[i], NULL, push_thread, &push_ids[i]);
    }
    for (int i = 0; i < PUSH_THREADS; i++) {
        pthread_join(push_threads[i], NULL);
    }

    printf("total pushed: %d\n", PUSH_THREADS * OPS_PER_THREAD);

    // עכשיו pop threads
    for (int i = 0; i < POP_THREADS; i++) {
        pop_ids[i] = i;
        pthread_create(&pop_threads[i], NULL, pop_thread, &pop_ids[i]);
    }
    for (int i = 0; i < POP_THREADS; i++) {
        pthread_join(pop_threads[i], NULL);
    }

    printf("total popped: %d\n", atomic_load(&total_popped));

    // ספור מה שנשאר
    int remaining = 0;
    int value;
    while (stack_pop(&stack, &value)) {
        remaining++;
    }
    printf("remaining in stack: %d\n", remaining);
    printf("total (popped + remaining): %d\n",
           atomic_load(&total_popped) + remaining);

    return 0;
}

פתרון לתרגיל 3 - תור SPSC

#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <pthread.h>
#include <time.h>

#define QUEUE_SIZE 4096  // חזקה של 2
#define NUM_ITEMS 10000000

typedef struct {
    int buffer[QUEUE_SIZE];
    _Atomic int head;
    _Atomic int tail;
} spsc_queue_t;

void spsc_init(spsc_queue_t *q) {
    atomic_store(&q->head, 0);
    atomic_store(&q->tail, 0);
}

bool spsc_push(spsc_queue_t *q, int item) {
    int head = atomic_load_explicit(&q->head, memory_order_relaxed);
    int tail = atomic_load_explicit(&q->tail, memory_order_acquire);

    if ((head - tail) >= QUEUE_SIZE) {
        return false;
    }

    q->buffer[head & (QUEUE_SIZE - 1)] = item;
    atomic_store_explicit(&q->head, head + 1, memory_order_release);
    return true;
}

bool spsc_pop(spsc_queue_t *q, int *item) {
    int tail = atomic_load_explicit(&q->tail, memory_order_relaxed);
    int head = atomic_load_explicit(&q->head, memory_order_acquire);

    if (tail >= head) {
        return false;
    }

    *item = q->buffer[tail & (QUEUE_SIZE - 1)];
    atomic_store_explicit(&q->tail, tail + 1, memory_order_release);
    return true;
}

spsc_queue_t queue;

void *producer(void *arg) {
    (void)arg;
    for (int i = 0; i < NUM_ITEMS; i++) {
        while (!spsc_push(&queue, i)) {
            // spin - התור מלא
        }
    }
    return NULL;
}

void *consumer(void *arg) {
    (void)arg;
    int item;
    long sum = 0;
    int count = 0;

    while (count < NUM_ITEMS) {
        if (spsc_pop(&queue, &item)) {
            sum += item;
            count++;
        }
    }

    printf("consumed %d items, sum = %ld\n", count, sum);
    return NULL;
}

int main(void) {
    spsc_init(&queue);

    struct timespec start, end;
    clock_gettime(CLOCK_MONOTONIC, &start);

    pthread_t prod, cons;
    pthread_create(&prod, NULL, producer, NULL);
    pthread_create(&cons, NULL, consumer, NULL);

    pthread_join(prod, NULL);
    pthread_join(cons, NULL);

    clock_gettime(CLOCK_MONOTONIC, &end);

    double elapsed = (end.tv_sec - start.tv_sec) +
                     (end.tv_nsec - start.tv_nsec) / 1e9;
    double ops_per_sec = NUM_ITEMS / elapsed;

    printf("elapsed: %.4f seconds\n", elapsed);
    printf("throughput: %.0f ops/sec\n", ops_per_sec);

    return 0;
}

פתרון לתרגיל 4 - השוואת ביצועים

#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <pthread.h>
#include <time.h>

#define NUM_THREADS 4
#define OPS_PER_THREAD 100000

// --- גרסת mutex ---

typedef struct mutex_node {
    int data;
    struct mutex_node *next;
} mutex_node_t;

typedef struct {
    mutex_node_t *top;
    pthread_mutex_t lock;
} mutex_stack_t;

void mutex_stack_init(mutex_stack_t *s) {
    s->top = NULL;
    pthread_mutex_init(&s->lock, NULL);
}

void mutex_stack_push(mutex_stack_t *s, int val) {
    mutex_node_t *n = malloc(sizeof(mutex_node_t));
    n->data = val;
    pthread_mutex_lock(&s->lock);
    n->next = s->top;
    s->top = n;
    pthread_mutex_unlock(&s->lock);
}

int mutex_stack_pop(mutex_stack_t *s, int *val) {
    pthread_mutex_lock(&s->lock);
    if (s->top == NULL) {
        pthread_mutex_unlock(&s->lock);
        return 0;
    }
    mutex_node_t *n = s->top;
    s->top = n->next;
    pthread_mutex_unlock(&s->lock);
    *val = n->data;
    free(n);
    return 1;
}

// --- גרסת lock-free (Treiber) ---

typedef struct lf_node {
    int data;
    struct lf_node *next;
} lf_node_t;

typedef struct {
    _Atomic(lf_node_t *) top;
} lf_stack_t;

void lf_stack_init(lf_stack_t *s) {
    atomic_store(&s->top, NULL);
}

void lf_stack_push(lf_stack_t *s, int val) {
    lf_node_t *n = malloc(sizeof(lf_node_t));
    n->data = val;
    lf_node_t *old = atomic_load(&s->top);
    do {
        n->next = old;
    } while (!atomic_compare_exchange_weak(&s->top, &old, n));
}

int lf_stack_pop(lf_stack_t *s, int *val) {
    lf_node_t *old = atomic_load(&s->top);
    do {
        if (old == NULL) return 0;
    } while (!atomic_compare_exchange_weak(&s->top, &old, old->next));
    *val = old->data;
    free(old);
    return 1;
}

// --- benchmark ---

mutex_stack_t ms;
lf_stack_t ls;

void *mutex_worker(void *arg) {
    (void)arg;
    for (int i = 0; i < OPS_PER_THREAD; i++) {
        mutex_stack_push(&ms, i);
    }
    int val;
    for (int i = 0; i < OPS_PER_THREAD; i++) {
        while (!mutex_stack_pop(&ms, &val)) {}
    }
    return NULL;
}

void *lf_worker(void *arg) {
    (void)arg;
    for (int i = 0; i < OPS_PER_THREAD; i++) {
        lf_stack_push(&ls, i);
    }
    int val;
    for (int i = 0; i < OPS_PER_THREAD; i++) {
        while (!lf_stack_pop(&ls, &val)) {}
    }
    return NULL;
}

double run_benchmark(void *(*func)(void *), const char *name) {
    struct timespec start, end;
    pthread_t threads[NUM_THREADS];

    clock_gettime(CLOCK_MONOTONIC, &start);

    for (int i = 0; i < NUM_THREADS; i++) {
        pthread_create(&threads[i], NULL, func, NULL);
    }
    for (int i = 0; i < NUM_THREADS; i++) {
        pthread_join(threads[i], NULL);
    }

    clock_gettime(CLOCK_MONOTONIC, &end);

    double elapsed = (end.tv_sec - start.tv_sec) +
                     (end.tv_nsec - start.tv_nsec) / 1e9;
    printf("%s: %.4f seconds\n", name, elapsed);
    return elapsed;
}

int main(void) {
    mutex_stack_init(&ms);
    lf_stack_init(&ls);

    double mutex_time = run_benchmark(mutex_worker, "mutex stack");
    double lf_time = run_benchmark(lf_worker, "lock-free stack");

    printf("lock-free is %.2fx %s than mutex\n",
           mutex_time > lf_time ? mutex_time / lf_time : lf_time / mutex_time,
           mutex_time > lf_time ? "faster" : "slower");

    /*
     * הערה: התוצאות תלויות מאוד בchip ובמספר הליבות.
     * עם contention גבוה (הרבה תהליכונים), lock-free בדרך כלל מנצח.
     * עם contention נמוך (מעט תהליכונים), mutex לפעמים מהיר יותר
     * כי futex ב-fast path הוא אופרציה אטומית אחת בuser-space.
     * המסקנה: תמיד מדדו לפני שמחליטים.
     */

    return 0;
}

פתרון לתרגיל 5 - זיהוי בעיית ABA

#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <pthread.h>
#include <unistd.h>

typedef struct node {
    char name;            // שם לזיהוי
    struct node *next;
} node_t;

typedef struct {
    _Atomic(node_t *) top;
} stack_t;

node_t node_A, node_B, node_C;
stack_t stack;

void print_stack(const char *label) {
    printf("[%s] stack: ", label);
    node_t *cur = atomic_load(&stack.top);
    while (cur) {
        printf("%c -> ", cur->name);
        cur = cur->next;
    }
    printf("NULL\n");
}

// תהליכון 1: מנסה לעשות pop, אבל נעצר באמצע
void *thread1(void *arg) {
    (void)arg;

    // שלב 1: קרא top ו-next
    node_t *old_top = atomic_load(&stack.top);
    node_t *new_top = old_top->next;

    printf("thread 1: read top=%c, next=%c\n", old_top->name, new_top->name);
    printf("thread 1: going to sleep before CAS...\n");

    // סמלץ preemption
    usleep(100000);

    printf("thread 1: woke up, attempting CAS(&top, %c, %c)\n",
           old_top->name, new_top->name);

    // ה-CAS יצליח כי top עדיין מצביע ל-C (אחרי שthread2 החזיר אותו)
    // אבל new_top (=B) כבר לא חלק מהמחסנית!
    if (atomic_compare_exchange_strong(&stack.top, &old_top, new_top)) {
        printf("thread 1: CAS succeeded! top is now '%c'\n", new_top->name);
        printf("thread 1: BUT '%c' was already removed! THIS IS THE ABA BUG!\n",
               new_top->name);
    } else {
        printf("thread 1: CAS failed (this would happen with tagged pointers)\n");
    }

    print_stack("after thread 1");
    return NULL;
}

// תהליכון 2: עושה pop C, pop B, push C - גורם לABA
void *thread2(void *arg) {
    (void)arg;

    usleep(10000);  // תן לthread1 לקרוא top קודם

    printf("thread 2: popping C\n");
    node_t *old = atomic_load(&stack.top);
    atomic_compare_exchange_strong(&stack.top, &old, old->next);
    print_stack("after pop C");

    printf("thread 2: popping B\n");
    old = atomic_load(&stack.top);
    atomic_compare_exchange_strong(&stack.top, &old, old->next);
    print_stack("after pop B");

    printf("thread 2: pushing C back\n");
    node_C.next = atomic_load(&stack.top);
    node_t *expected = node_C.next;
    atomic_compare_exchange_strong(&stack.top, &expected, &node_C);
    print_stack("after push C");

    return NULL;
}

int main(void) {
    // בנה מחסנית: top -> C -> B -> A -> NULL
    node_A.name = 'A';
    node_A.next = NULL;
    node_B.name = 'B';
    node_B.next = &node_A;
    node_C.name = 'C';
    node_C.next = &node_B;
    atomic_store(&stack.top, &node_C);

    print_stack("initial");

    pthread_t t1, t2;
    pthread_create(&t1, NULL, thread1, NULL);
    pthread_create(&t2, NULL, thread2, NULL);

    pthread_join(t1, NULL);
    pthread_join(t2, NULL);

    /*
     * הפתרון לבעיית ABA: tagged pointers.
     * כל פעם שmשנים את top, מעלים מונה (tag).
     * ה-CAS בודק גם את המצביע וגם את המונה.
     * גם אם המצביע חזר לאותו ערך (C), המונה שונה -> CAS נכשל.
     *
     * מימוש:
     * typedef struct {
     *     node_t *ptr;
     *     uint64_t tag;
     * } tagged_ptr_t;
     *
     * בCAS על 128 ביט (cmpxchg16b ב-x86_64), צריך לבדוק
     * גם ptr וגם tag. אם tag השתנה - CAS נכשל, והבעיה נמנעת.
     */

    return 0;
}