לדלג לתוכן

10.4 מודל הזכרון פתרון

פתרון - מודל הזכרון - memory model and volatile

פתרון 1 - volatile ואופטימיזציות

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

// שנו ל-"int flag = 0" כדי לראות את הבעיה
volatile int flag = 0;

void *waiter(void *arg) {
    printf("waiter: waiting for flag...\n");
    while (flag == 0) { }
    printf("waiter: flag changed!\n");
    return NULL;
}

void *setter(void *arg) {
    sleep(1);
    printf("setter: setting flag to 1\n");
    flag = 1;
    return NULL;
}

int main(void) {
    pthread_t t1, t2;
    pthread_create(&t1, NULL, waiter, NULL);
    pthread_create(&t2, NULL, setter, NULL);
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    return 0;
}

קומפילציה:

gcc -O2 -lpthread volatile_test.c -o volatile_test

הסבר: ללא volatile, עם -O2, הקומפיילר רואה שבתוך הלולאה while (flag == 0) אף אחד לא משנה את flag (מנקודת המבט של הthread הזה). לכן הוא קורא את flag פעם אחת, שומר את הערך ברגיסטר, ובודק את הרגיסטר בלולאה אינסופית. הthread לעולם לא רואה את השינוי.

עם volatile, הקומפיילר חייב לקרוא מהזיכרון בכל איטרציה, ולכן רואה את השינוי.

שימו לב: בקוד אמיתי, volatile לבד לא מספיק ל-thread safety. עדיף להשתמש ב-atomics. הדוגמה הזו מדגימה את ההבדל של volatile בלבד.


פתרון 2 - מונה אטומי

#include <stdio.h>
#include <pthread.h>
#include <stdatomic.h>

#define NUM_THREADS 4
#define INCREMENTS 1000000

// שנו ל-"int counter = 0" כדי לראות race condition
atomic_int counter = 0;

void *increment(void *arg) {
    for (int i = 0; i < INCREMENTS; i++) {
        atomic_fetch_add_explicit(&counter, 1, memory_order_relaxed);
        // לגרסה ללא atomics: counter++;
    }
    return NULL;
}

int main(void) {
    pthread_t threads[NUM_THREADS];

    for (int i = 0; i < NUM_THREADS; i++)
        pthread_create(&threads[i], NULL, increment, NULL);

    for (int i = 0; i < NUM_THREADS; i++)
        pthread_join(threads[i], NULL);

    printf("counter = %d (expected %d)\n",
           atomic_load(&counter), NUM_THREADS * INCREMENTS);

    return 0;
}

קומפילציה:

gcc -lpthread atomic_counter.c -o atomic_counter

הסבר:

בלי atomics, counter++ הוא שלוש פעולות: load, add, store. שני threads יכולים לקרוא את אותו ערך, לחבר 1, ולכתוב - כך שעדכון אחד הולך לאיבוד. התוצאה תהיה פחות מ-4,000,000.

עם atomics, atomic_fetch_add מבצע את כל שלוש הפעולות כפעולה אטומית אחת (בx86 זה מתורגם ל-lock xadd). התוצאה תמיד 4,000,000.

memory_order_relaxed מספיק כי אנחנו רק מגדילים מונה - אנחנו לא מסתמכים על סדר של המונה ביחס למשתנים אחרים. כל מה שצריך זה שהפעולה תהיה אטומית.


פתרון 3 - producer/consumer עם acquire/release

#include <stdio.h>
#include <pthread.h>
#include <stdatomic.h>
#include <unistd.h>

#define NUM_MESSAGES 10

int data;
atomic_int ready = 0;

void *producer(void *arg) {
    for (int i = 0; i < NUM_MESSAGES; i++) {
        // מחכה שהצרכן יסיים עם ההודעה הקודמת
        while (atomic_load_explicit(&ready, memory_order_acquire) != 0)
            ;

        data = (i + 1) * 100;  // הכנת הנתונים
        printf("producer: sent %d\n", data);

        // מפרסם שהנתונים מוכנים
        atomic_store_explicit(&ready, 1, memory_order_release);
    }
    return NULL;
}

void *consumer(void *arg) {
    for (int i = 0; i < NUM_MESSAGES; i++) {
        // מחכה שהיצרן יפרסם נתונים
        while (atomic_load_explicit(&ready, memory_order_acquire) != 1)
            ;

        printf("consumer: received %d\n", data);

        // מודיע ליצרן שסיים
        atomic_store_explicit(&ready, 0, memory_order_release);
    }
    return NULL;
}

int main(void) {
    pthread_t prod, cons;
    pthread_create(&prod, NULL, producer, NULL);
    pthread_create(&cons, NULL, consumer, NULL);
    pthread_join(prod, NULL);
    pthread_join(cons, NULL);
    return 0;
}

הסבר:

הdפוס עובד כך:
1. היצרן כותב ל-data ואז עושה release store של ready=1
2. הצרכן עושה acquire load של ready. כשהוא רואה 1, מובטח שהוא יראה את הdata שהיצרן כתב
3. הצרכן קורא את data ואז עושה release store של ready=0
4. היצרן עושה acquire load של ready. כשהוא רואה 0, הוא יודע שבטוח לכתוב data חדש

אם נחליף ב-relaxed, תיאורטית זה יכול להיכשל - הצרכן עלול לראות ready=1 אבל לקרוא ערך ישן של data, כי relaxed לא נותן ערבויות סדר בין משתנים שונים. בx86 זה "עובד" בגלל מודל הזכרון החזק של x86 (TSO), אבל בARM זה יכול באמת להיכשל.


פתרון 4 - ניסוי מחסום קומפיילר

int x = 0, y = 0;

// גרסה ללא מחסום
void set_values_no_barrier(void) {
    x = 1;
    y = 2;
}

// גרסה עם מחסום
void set_values_with_barrier(void) {
    x = 1;
    asm volatile("" ::: "memory");
    y = 2;
}

קומפילציה לאסמבלי:

gcc -O2 -S barrier_test.c -o barrier_test.s

ללא מחסום, הקומפיילר עשוי:
- להחליף את סדר הכתיבות
- לבצע את שתי הכתיבות בו זמנית (אם יש הוראות מתאימות)

עם מחסום, הקומפיילר חייב לסיים את הכתיבה ל-x לפני שמתחיל את הכתיבה ל-y.

למה זה לא מספיק ל-threads? מחסום קומפיילר מונע רק סידור מחדש של הקומפיילר. באסמבלי שנוצר, הכתיבות יהיו בסדר הנכון. אבל המעבד עדיין יכול לבצע store buffering - לשמור את הכתיבה ב-store buffer ולא לשטוף אותה לcache מיד. על ARM, thread אחר עלול לראות y=2 לפני x=1 למרות שהקומפיילר שמר על הסדר.

לסינכרון אמיתי בין threads צריך atomics (שכוללים גם מחסומי מעבד) או מחסום מעבד מפורש (mfence).


פתרון 5 - תור lock-free

#include <stdio.h>
#include <pthread.h>
#include <stdatomic.h>

#define QUEUE_SIZE 64
#define NUM_ITEMS 1000

typedef struct {
    int buffer[QUEUE_SIZE];
    atomic_int head;
    atomic_int tail;
} queue_t;

void queue_init(queue_t *q) {
    atomic_store(&q->head, 0);
    atomic_store(&q->tail, 0);
}

int queue_push(queue_t *q, int val) {
    int tail = atomic_load_explicit(&q->tail, memory_order_relaxed);
    int next = (tail + 1) % QUEUE_SIZE;

    // התור מלא?
    if (next == atomic_load_explicit(&q->head, memory_order_acquire))
        return -1;

    q->buffer[tail] = val;
    atomic_store_explicit(&q->tail, next, memory_order_release);
    return 0;
}

int queue_pop(queue_t *q, int *val) {
    int head = atomic_load_explicit(&q->head, memory_order_relaxed);

    // התור ריק?
    if (head == atomic_load_explicit(&q->tail, memory_order_acquire))
        return -1;

    *val = q->buffer[head];
    atomic_store_explicit(&q->head, (head + 1) % QUEUE_SIZE,
                         memory_order_release);
    return 0;
}

queue_t queue;

void *producer(void *arg) {
    for (int i = 0; i < NUM_ITEMS; i++) {
        // אם התור מלא, ממתין
        while (queue_push(&queue, i) != 0)
            ;
    }
    printf("producer: done sending %d items\n", NUM_ITEMS);
    return NULL;
}

void *consumer(void *arg) {
    int val;
    int count = 0;
    int expected = 0;

    while (count < NUM_ITEMS) {
        if (queue_pop(&queue, &val) == 0) {
            if (val != expected) {
                printf("ERROR: expected %d, got %d\n", expected, val);
            }
            expected++;
            count++;
        }
    }
    printf("consumer: received %d items in order\n", count);
    return NULL;
}

int main(void) {
    queue_init(&queue);

    pthread_t prod, cons;
    pthread_create(&prod, NULL, producer, NULL);
    pthread_create(&cons, NULL, consumer, NULL);
    pthread_join(prod, NULL);
    pthread_join(cons, NULL);

    return 0;
}

קומפילציה:

gcc -O2 -lpthread spsc_queue.c -o spsc_queue

הסבר:

התור עובד בצורת מערך מעגלי עם שני אינדקסים: head (מאיפה הצרכן קורא) ו-tail (לאן היצרן כותב). התור ריק כש-head == tail, ומלא כש-(tail + 1) % size == head.

הacquire/release עובדים כך:
- push: קודם כותבים ל-buffer, ואז release store ל-tail. זה מבטיח שהצרכן יראה את הנתונים ב-buffer כשהוא רואה את tail החדש.
- pop: קודם קוראים מ-buffer, ואז release store ל-head. זה מבטיח שהיצרן לא ידרוס את ה-slot הזה לפני שהצרכן סיים לקרוא ממנו.

כשהתור מלא, היצרן פשוט מנסה שוב (busy-wait). בקוד אמיתי, אפשר להוסיף sched_yield() או לישון זמן קצר כדי לא לבזבז CPU.