12.2 מבני נתונים ללא נעילה פתרון
פתרון לתרגיל 1 - מונה אטומי עם CAS¶
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <pthread.h>
typedef struct {
_Atomic int value;
} atomic_counter_t;
void counter_init(atomic_counter_t *c, int initial) {
atomic_store(&c->value, initial);
}
void counter_increment(atomic_counter_t *c) {
int old = atomic_load(&c->value);
while (!atomic_compare_exchange_weak(&c->value, &old, old + 1)) {
// old מתעדכן אוטומטית לערך הנוכחי, ננסה שוב
}
}
void counter_add(atomic_counter_t *c, int amount) {
int old = atomic_load(&c->value);
while (!atomic_compare_exchange_weak(&c->value, &old, old + amount)) {
// retry
}
}
int counter_get(atomic_counter_t *c) {
return atomic_load(&c->value);
}
#define NUM_THREADS 8
#define OPS_PER_THREAD 100000
atomic_counter_t counter;
void *increment_thread(void *arg) {
(void)arg;
for (int i = 0; i < OPS_PER_THREAD; i++) {
counter_increment(&counter);
}
return NULL;
}
int main(void) {
counter_init(&counter, 0);
pthread_t threads[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
pthread_create(&threads[i], NULL, increment_thread, NULL);
}
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
int result = counter_get(&counter);
printf("expected: %d\n", NUM_THREADS * OPS_PER_THREAD);
printf("got: %d\n", result);
printf("correct: %s\n", result == NUM_THREADS * OPS_PER_THREAD ? "yes" : "NO!");
return 0;
}
קומפילציה:
פתרון לתרגיל 2 - מחסנית Treiber¶
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <pthread.h>
typedef struct node {
int data;
struct node *next;
} node_t;
typedef struct {
_Atomic(node_t *) top;
} treiber_stack_t;
void stack_init(treiber_stack_t *s) {
atomic_store(&s->top, NULL);
}
void stack_push(treiber_stack_t *s, int value) {
node_t *new_node = malloc(sizeof(node_t));
new_node->data = value;
node_t *old_top = atomic_load(&s->top);
do {
new_node->next = old_top;
} while (!atomic_compare_exchange_weak(&s->top, &old_top, new_node));
}
// מחזיר 1 אם הצליח, 0 אם ריק
int stack_pop(treiber_stack_t *s, int *value) {
node_t *old_top = atomic_load(&s->top);
do {
if (old_top == NULL) {
return 0;
}
} while (!atomic_compare_exchange_weak(&s->top, &old_top, old_top->next));
*value = old_top->data;
// הערה: בתוכנית אמיתית, free כאן מסוכן בגלל ABA
// לצורך התרגיל זה בסדר כי הpop threads שלנו לא מתערבבים עם push
free(old_top);
return 1;
}
#define PUSH_THREADS 4
#define POP_THREADS 4
#define OPS_PER_THREAD 10000
treiber_stack_t stack;
_Atomic int total_popped = 0;
void *push_thread(void *arg) {
int id = *(int *)arg;
for (int i = 0; i < OPS_PER_THREAD; i++) {
stack_push(&stack, id * OPS_PER_THREAD + i);
}
printf("push thread %d: pushed %d items\n", id, OPS_PER_THREAD);
return NULL;
}
void *pop_thread(void *arg) {
int id = *(int *)arg;
int count = 0;
int value;
// נסה לעשות pop עד שנצליח OPS_PER_THREAD פעמים
while (count < OPS_PER_THREAD) {
if (stack_pop(&stack, &value)) {
count++;
}
}
atomic_fetch_add(&total_popped, count);
printf("pop thread %d: popped %d items\n", id, count);
return NULL;
}
int main(void) {
stack_init(&stack);
pthread_t push_threads[PUSH_THREADS], pop_threads[POP_THREADS];
int push_ids[PUSH_THREADS], pop_ids[POP_THREADS];
// הפעל push threads קודם כדי שיהיה מה לקרוא
for (int i = 0; i < PUSH_THREADS; i++) {
push_ids[i] = i;
pthread_create(&push_threads[i], NULL, push_thread, &push_ids[i]);
}
for (int i = 0; i < PUSH_THREADS; i++) {
pthread_join(push_threads[i], NULL);
}
printf("total pushed: %d\n", PUSH_THREADS * OPS_PER_THREAD);
// עכשיו pop threads
for (int i = 0; i < POP_THREADS; i++) {
pop_ids[i] = i;
pthread_create(&pop_threads[i], NULL, pop_thread, &pop_ids[i]);
}
for (int i = 0; i < POP_THREADS; i++) {
pthread_join(pop_threads[i], NULL);
}
printf("total popped: %d\n", atomic_load(&total_popped));
// ספור מה שנשאר
int remaining = 0;
int value;
while (stack_pop(&stack, &value)) {
remaining++;
}
printf("remaining in stack: %d\n", remaining);
printf("total (popped + remaining): %d\n",
atomic_load(&total_popped) + remaining);
return 0;
}
פתרון לתרגיל 3 - תור SPSC¶
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <pthread.h>
#include <time.h>
#define QUEUE_SIZE 4096 // חזקה של 2
#define NUM_ITEMS 10000000
typedef struct {
int buffer[QUEUE_SIZE];
_Atomic int head;
_Atomic int tail;
} spsc_queue_t;
void spsc_init(spsc_queue_t *q) {
atomic_store(&q->head, 0);
atomic_store(&q->tail, 0);
}
bool spsc_push(spsc_queue_t *q, int item) {
int head = atomic_load_explicit(&q->head, memory_order_relaxed);
int tail = atomic_load_explicit(&q->tail, memory_order_acquire);
if ((head - tail) >= QUEUE_SIZE) {
return false;
}
q->buffer[head & (QUEUE_SIZE - 1)] = item;
atomic_store_explicit(&q->head, head + 1, memory_order_release);
return true;
}
bool spsc_pop(spsc_queue_t *q, int *item) {
int tail = atomic_load_explicit(&q->tail, memory_order_relaxed);
int head = atomic_load_explicit(&q->head, memory_order_acquire);
if (tail >= head) {
return false;
}
*item = q->buffer[tail & (QUEUE_SIZE - 1)];
atomic_store_explicit(&q->tail, tail + 1, memory_order_release);
return true;
}
spsc_queue_t queue;
void *producer(void *arg) {
(void)arg;
for (int i = 0; i < NUM_ITEMS; i++) {
while (!spsc_push(&queue, i)) {
// spin - התור מלא
}
}
return NULL;
}
void *consumer(void *arg) {
(void)arg;
int item;
long sum = 0;
int count = 0;
while (count < NUM_ITEMS) {
if (spsc_pop(&queue, &item)) {
sum += item;
count++;
}
}
printf("consumed %d items, sum = %ld\n", count, sum);
return NULL;
}
int main(void) {
spsc_init(&queue);
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);
pthread_t prod, cons;
pthread_create(&prod, NULL, producer, NULL);
pthread_create(&cons, NULL, consumer, NULL);
pthread_join(prod, NULL);
pthread_join(cons, NULL);
clock_gettime(CLOCK_MONOTONIC, &end);
double elapsed = (end.tv_sec - start.tv_sec) +
(end.tv_nsec - start.tv_nsec) / 1e9;
double ops_per_sec = NUM_ITEMS / elapsed;
printf("elapsed: %.4f seconds\n", elapsed);
printf("throughput: %.0f ops/sec\n", ops_per_sec);
return 0;
}
פתרון לתרגיל 4 - השוואת ביצועים¶
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <pthread.h>
#include <time.h>
#define NUM_THREADS 4
#define OPS_PER_THREAD 100000
// --- גרסת mutex ---
typedef struct mutex_node {
int data;
struct mutex_node *next;
} mutex_node_t;
typedef struct {
mutex_node_t *top;
pthread_mutex_t lock;
} mutex_stack_t;
void mutex_stack_init(mutex_stack_t *s) {
s->top = NULL;
pthread_mutex_init(&s->lock, NULL);
}
void mutex_stack_push(mutex_stack_t *s, int val) {
mutex_node_t *n = malloc(sizeof(mutex_node_t));
n->data = val;
pthread_mutex_lock(&s->lock);
n->next = s->top;
s->top = n;
pthread_mutex_unlock(&s->lock);
}
int mutex_stack_pop(mutex_stack_t *s, int *val) {
pthread_mutex_lock(&s->lock);
if (s->top == NULL) {
pthread_mutex_unlock(&s->lock);
return 0;
}
mutex_node_t *n = s->top;
s->top = n->next;
pthread_mutex_unlock(&s->lock);
*val = n->data;
free(n);
return 1;
}
// --- גרסת lock-free (Treiber) ---
typedef struct lf_node {
int data;
struct lf_node *next;
} lf_node_t;
typedef struct {
_Atomic(lf_node_t *) top;
} lf_stack_t;
void lf_stack_init(lf_stack_t *s) {
atomic_store(&s->top, NULL);
}
void lf_stack_push(lf_stack_t *s, int val) {
lf_node_t *n = malloc(sizeof(lf_node_t));
n->data = val;
lf_node_t *old = atomic_load(&s->top);
do {
n->next = old;
} while (!atomic_compare_exchange_weak(&s->top, &old, n));
}
int lf_stack_pop(lf_stack_t *s, int *val) {
lf_node_t *old = atomic_load(&s->top);
do {
if (old == NULL) return 0;
} while (!atomic_compare_exchange_weak(&s->top, &old, old->next));
*val = old->data;
free(old);
return 1;
}
// --- benchmark ---
mutex_stack_t ms;
lf_stack_t ls;
void *mutex_worker(void *arg) {
(void)arg;
for (int i = 0; i < OPS_PER_THREAD; i++) {
mutex_stack_push(&ms, i);
}
int val;
for (int i = 0; i < OPS_PER_THREAD; i++) {
while (!mutex_stack_pop(&ms, &val)) {}
}
return NULL;
}
void *lf_worker(void *arg) {
(void)arg;
for (int i = 0; i < OPS_PER_THREAD; i++) {
lf_stack_push(&ls, i);
}
int val;
for (int i = 0; i < OPS_PER_THREAD; i++) {
while (!lf_stack_pop(&ls, &val)) {}
}
return NULL;
}
double run_benchmark(void *(*func)(void *), const char *name) {
struct timespec start, end;
pthread_t threads[NUM_THREADS];
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < NUM_THREADS; i++) {
pthread_create(&threads[i], NULL, func, NULL);
}
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double elapsed = (end.tv_sec - start.tv_sec) +
(end.tv_nsec - start.tv_nsec) / 1e9;
printf("%s: %.4f seconds\n", name, elapsed);
return elapsed;
}
int main(void) {
mutex_stack_init(&ms);
lf_stack_init(&ls);
double mutex_time = run_benchmark(mutex_worker, "mutex stack");
double lf_time = run_benchmark(lf_worker, "lock-free stack");
printf("lock-free is %.2fx %s than mutex\n",
mutex_time > lf_time ? mutex_time / lf_time : lf_time / mutex_time,
mutex_time > lf_time ? "faster" : "slower");
/*
* הערה: התוצאות תלויות מאוד בchip ובמספר הליבות.
* עם contention גבוה (הרבה תהליכונים), lock-free בדרך כלל מנצח.
* עם contention נמוך (מעט תהליכונים), mutex לפעמים מהיר יותר
* כי futex ב-fast path הוא אופרציה אטומית אחת בuser-space.
* המסקנה: תמיד מדדו לפני שמחליטים.
*/
return 0;
}
פתרון לתרגיל 5 - זיהוי בעיית ABA¶
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <pthread.h>
#include <unistd.h>
typedef struct node {
char name; // שם לזיהוי
struct node *next;
} node_t;
typedef struct {
_Atomic(node_t *) top;
} stack_t;
node_t node_A, node_B, node_C;
stack_t stack;
void print_stack(const char *label) {
printf("[%s] stack: ", label);
node_t *cur = atomic_load(&stack.top);
while (cur) {
printf("%c -> ", cur->name);
cur = cur->next;
}
printf("NULL\n");
}
// תהליכון 1: מנסה לעשות pop, אבל נעצר באמצע
void *thread1(void *arg) {
(void)arg;
// שלב 1: קרא top ו-next
node_t *old_top = atomic_load(&stack.top);
node_t *new_top = old_top->next;
printf("thread 1: read top=%c, next=%c\n", old_top->name, new_top->name);
printf("thread 1: going to sleep before CAS...\n");
// סמלץ preemption
usleep(100000);
printf("thread 1: woke up, attempting CAS(&top, %c, %c)\n",
old_top->name, new_top->name);
// ה-CAS יצליח כי top עדיין מצביע ל-C (אחרי שthread2 החזיר אותו)
// אבל new_top (=B) כבר לא חלק מהמחסנית!
if (atomic_compare_exchange_strong(&stack.top, &old_top, new_top)) {
printf("thread 1: CAS succeeded! top is now '%c'\n", new_top->name);
printf("thread 1: BUT '%c' was already removed! THIS IS THE ABA BUG!\n",
new_top->name);
} else {
printf("thread 1: CAS failed (this would happen with tagged pointers)\n");
}
print_stack("after thread 1");
return NULL;
}
// תהליכון 2: עושה pop C, pop B, push C - גורם לABA
void *thread2(void *arg) {
(void)arg;
usleep(10000); // תן לthread1 לקרוא top קודם
printf("thread 2: popping C\n");
node_t *old = atomic_load(&stack.top);
atomic_compare_exchange_strong(&stack.top, &old, old->next);
print_stack("after pop C");
printf("thread 2: popping B\n");
old = atomic_load(&stack.top);
atomic_compare_exchange_strong(&stack.top, &old, old->next);
print_stack("after pop B");
printf("thread 2: pushing C back\n");
node_C.next = atomic_load(&stack.top);
node_t *expected = node_C.next;
atomic_compare_exchange_strong(&stack.top, &expected, &node_C);
print_stack("after push C");
return NULL;
}
int main(void) {
// בנה מחסנית: top -> C -> B -> A -> NULL
node_A.name = 'A';
node_A.next = NULL;
node_B.name = 'B';
node_B.next = &node_A;
node_C.name = 'C';
node_C.next = &node_B;
atomic_store(&stack.top, &node_C);
print_stack("initial");
pthread_t t1, t2;
pthread_create(&t1, NULL, thread1, NULL);
pthread_create(&t2, NULL, thread2, NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
/*
* הפתרון לבעיית ABA: tagged pointers.
* כל פעם שmשנים את top, מעלים מונה (tag).
* ה-CAS בודק גם את המצביע וגם את המונה.
* גם אם המצביע חזר לאותו ערך (C), המונה שונה -> CAS נכשל.
*
* מימוש:
* typedef struct {
* node_t *ptr;
* uint64_t tag;
* } tagged_ptr_t;
*
* בCAS על 128 ביט (cmpxchg16b ב-x86_64), צריך לבדוק
* גם ptr וגם tag. אם tag השתנה - CAS נכשל, והבעיה נמנעת.
*/
return 0;
}