12.2 מבני נתונים ללא נעילה הרצאה
הקדמה¶
בפרק הקודם (12.1) למדנו דפוסי תכנות מקבילי שמבוססים על נעילות - mutex, rwlock, barrier. הנעילות עובדות מצוין ברוב המקרים, אבל יש להן בעיות:
- מתחרות (contention) - כשהרבה תהליכונים מנסים לנעול את אותו mutex, רובם חוסמים וממתינים. במקום לעבוד, הם ישנים.
- היפוך עדיפויות (priority inversion) - תהליכון בעדיפות נמוכה מחזיק נעילה שתהליכון בעדיפות גבוהה צריך. התוצאה: התהליכון החשוב מחכה לתהליכון הפחות חשוב.
- קיפאון (deadlock) - שני תהליכונים מחכים אחד לשני. אף אחד לא מתקדם. לעולם.
אז האם אפשר לעשות תכנות מקבילי בלי נעילות בכלל? התשובה: כן, אבל זה הרבה יותר קשה. בואו נראה איך.
השוואה-והחלפה - compare-and-swap (CAS)¶
אבן הבניין הבסיסית¶
כמו שלמדנו בפרק 10.4, המעבד תומך בפעולות אטומיות - פעולות שמתבצעות "בבת אחת" מבחינת כל השאר. הפעולה האטומית הכי חשובה לתכנות ללא נעילה היא CAS - השוואה והחלפה (Compare-And-Swap).
הרעיון:
CAS(address, expected, desired):
אם *address == expected:
*address = desired
החזר true
אחרת:
expected = *address // עדכן את expected לערך הנוכחי
החזר false
כל הפעולה הזו מתבצעת באופן אטומי - אף תהליכון אחר לא יכול לגשת ל-address באמצע. ב-x86 זו הוראת cmpxchg אחת.
בC11¶
#include <stdatomic.h>
_Atomic int counter = 0;
// CAS - החזר true אם הצלחנו, false אם מישהו שינה לפנינו
int expected = 5;
int desired = 6;
bool success = atomic_compare_exchange_strong(&counter, &expected, desired);
// אם counter היה 5: עכשיו counter=6, success=true
// אם counter לא היה 5: expected מעודכן לערך הנוכחי, success=false
הדפוס הבסיסי - לולאת CAS¶
הדפוס הנפוץ ביותר הוא לולאת retry:
void atomic_add(_Atomic int *val, int amount) {
int old = atomic_load(val);
while (!atomic_compare_exchange_weak(val, &old, old + amount)) {
// CAS נכשל - old עודכן לערך הנוכחי, ננסה שוב
}
}
למה weak ולא strong? ההבדל: weak יכול להחזיר false גם כשהערך לא השתנה (spurious failure), אבל הוא יותר מהיר בלולאה. strong מובטח שנכשל רק כשהערך באמת השתנה.
מחסנית ללא נעילה - Treiber stack¶
הרעיון¶
מחסנית מקושרת (linked list stack) שמאפשרת push ו-pop מקביליים בלי שום mutex. הרעיון של Treiber (1986):
push: צור צומת חדש, הצבע אותו לראש הנוכחי, ואז החלף את הראש לצומת החדש עם CAS.pop: קרא את הראש, והחלף אותו לnext שלו עם CAS.
אם ה-CAS נכשל (מישהו שינה את הראש בינתיים) - פשוט ננסה שוב.
מימוש¶
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <pthread.h>
typedef struct node {
int data;
struct node *next;
} node_t;
typedef struct {
_Atomic(node_t *) top;
} treiber_stack_t;
void stack_init(treiber_stack_t *s) {
atomic_store(&s->top, NULL);
}
void stack_push(treiber_stack_t *s, int value) {
node_t *new_node = malloc(sizeof(node_t));
new_node->data = value;
node_t *old_top = atomic_load(&s->top);
do {
new_node->next = old_top;
// נסה להחליף את top ל-new_node
// אם top עדיין == old_top, יצליח
// אם מישהו שינה את top, old_top יתעדכן ונסה שוב
} while (!atomic_compare_exchange_weak(&s->top, &old_top, new_node));
}
// מחזיר -1 אם המחסנית ריקה
int stack_pop(treiber_stack_t *s) {
node_t *old_top = atomic_load(&s->top);
do {
if (old_top == NULL) {
return -1; // ריק
}
// נסה להחליף את top ל-old_top->next
} while (!atomic_compare_exchange_weak(&s->top, &old_top, old_top->next));
int value = old_top->data;
free(old_top);
return value;
}
בדיקה¶
#define NUM_THREADS 4
#define OPS_PER_THREAD 10000
treiber_stack_t stack;
void *push_thread(void *arg) {
int id = *(int *)arg;
for (int i = 0; i < OPS_PER_THREAD; i++) {
stack_push(&stack, id * OPS_PER_THREAD + i);
}
return NULL;
}
void *pop_thread(void *arg) {
int count = 0;
for (int i = 0; i < OPS_PER_THREAD; i++) {
if (stack_pop(&stack) != -1) {
count++;
}
}
printf("popped %d items\n", count);
return NULL;
}
int main(void) {
stack_init(&stack);
pthread_t threads[NUM_THREADS];
int ids[NUM_THREADS];
// חצי push, חצי pop
for (int i = 0; i < NUM_THREADS / 2; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, push_thread, &ids[i]);
}
for (int i = NUM_THREADS / 2; i < NUM_THREADS; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, pop_thread, &ids[i]);
}
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
קומפילציה:
בעיית ABA¶
הבעיה¶
המחסנית של Treiber סובלת מבעיה מפורסמת בשם ABA. הנה התרחיש:
- תהליכון 1 רוצה לעשות pop. הוא קורא
top = A(שמצביע ל-B). הוא מתכנן לעשותCAS(&top, A, B). - תהליכון 1 נכנס לשינה (preempted) לפני שה-CAS מתבצע.
- תהליכון 2 עושה pop של A. עכשיו
top = B. - תהליכון 2 עושה pop של B. עכשיו
top = C. - תהליכון 2 עושה push של A בחזרה. עכשיו
top = A, ו-A מצביע ל-C. - תהליכון 1 מתעורר. הוא עושה
CAS(&top, A, B)- וזה מצליח כי top אכן שווה A! אבל B כבר שוחרר, וזה גישה לזכרון לא חוקי.
הבעיה: ה-CAS בודק רק אם הערך שווה, לא אם הוא אותו אחד. הערך חזר להיות A, אבל כל מבנה הנתונים השתנה.
פתרון - מצביעים מתויגים (tagged pointers)¶
הרעיון: נוסיף מונה לכל מצביע. כל פעם שמשנים את הראש, מעלים את המונה. גם אם המצביע חוזר לאותו ערך, המונה יהיה שונה וה-CAS ייכשל.
#include <stdatomic.h>
#include <stdint.h>
// נשתמש ב-128 ביט: 64 ביט מצביע + 64 ביט מונה
typedef struct {
node_t *ptr;
uint64_t tag;
} tagged_ptr_t;
// נצטרך CAS על 128 ביט (ב-x86_64 זה cmpxchg16b)
typedef _Atomic __int128 atomic_tagged_ptr_t;
// המרה
static inline __int128 pack(node_t *ptr, uint64_t tag) {
__int128 result = ((__int128)tag << 64) | (uint64_t)ptr;
return result;
}
static inline node_t *unpack_ptr(__int128 val) {
return (node_t *)(uint64_t)val;
}
static inline uint64_t unpack_tag(__int128 val) {
return (uint64_t)(val >> 64);
}
בפועל, הרבה מימושים משתמשים בביטים העליונים של מצביע 64 ביט (שלא בשימוש במרחב כתובות של 48 ביט) כמונה, כדי להימנע מ-128 ביט CAS.
הערה: בעיית ABA היא בעיקר בעיה כשמשחררים ומקצים מחדש זכרון. אם משתמשים בפתרונות כמו hazard pointers או epoch-based reclamation (מנגנונים שמבטיחים שלא משחררים צומת בזמן שמישהו עוד מחזיק מצביע אליו), אפשר להימנע מהבעיה גם בלי תגיות.
תור SPSC ללא נעילה - single-producer single-consumer lock-free queue¶
למה SPSC מיוחד¶
כשיש רק יצרן אחד וצרכן אחד, הדברים הרבה יותר פשוטים:
- רק היצרן כותב ל-head (אינדקס הכתיבה).
- רק הצרכן כותב ל-tail (אינדקס הקריאה).
- לא צריך CAS בכלל! מספיקים loads/stores אטומיים עם סדר זכרון מתאים.
זה הסוג הכי מהיר של תור מקבילי, ומשתמשים בו ב:
- מערכות אודיו (תהליכון אחד מקליט, אחד מנגן)
- חוצצים רשתיים (כרטיס רשת כותב, תוכנה קוראת)
- תקשורת בין תהליכונים בתוכנה (inter-thread communication)
מימוש - ring buffer¶
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <pthread.h>
#define QUEUE_SIZE 1024 // חייב להיות חזקה של 2
typedef struct {
int buffer[QUEUE_SIZE];
// head - הבא לכתיבה (רק היצרן כותב)
// tail - הבא לקריאה (רק הצרכן כותב)
_Atomic int head;
_Atomic int tail;
} spsc_queue_t;
void spsc_init(spsc_queue_t *q) {
atomic_store(&q->head, 0);
atomic_store(&q->tail, 0);
}
bool spsc_push(spsc_queue_t *q, int item) {
int head = atomic_load_explicit(&q->head, memory_order_relaxed);
int tail = atomic_load_explicit(&q->tail, memory_order_acquire);
// בדוק אם מלא
if ((head - tail) >= QUEUE_SIZE) {
return false; // מלא
}
q->buffer[head & (QUEUE_SIZE - 1)] = item; // head % QUEUE_SIZE
// פרסם את הפריט החדש - הצרכן יראה אותו אחרי ה-release
atomic_store_explicit(&q->head, head + 1, memory_order_release);
return true;
}
bool spsc_pop(spsc_queue_t *q, int *item) {
int tail = atomic_load_explicit(&q->tail, memory_order_relaxed);
int head = atomic_load_explicit(&q->head, memory_order_acquire);
// בדוק אם ריק
if (tail >= head) {
return false; // ריק
}
*item = q->buffer[tail & (QUEUE_SIZE - 1)]; // tail % QUEUE_SIZE
// שחרר את המקום - היצרן יראה אותו אחרי ה-release
atomic_store_explicit(&q->tail, tail + 1, memory_order_release);
return true;
}
שימו לב: אין כאן mutex, אין CAS, אין חסימה. רק load ו-store אטומיים עם סדר acquire/release. זה הסוד של הביצועים הגבוהים.
למה acquire/release¶
חזרה קצרה לחומר מפרק 10.4 - סדר זכרון (memory ordering):
- memory_order_release בstore: כל הכתיבות שלפניו מובטחות להיות גלויות לפני שהstore הזה נראה.
- memory_order_acquire בload: כל הקריאות שאחריו מובטחות לראות את מה שהיה לפני הrelease התואם.
בתור שלנו:
- היצרן כותב לbuffer, ואז עושה release store ל-head. כך מובטח שהצרכן יראה את הנתון בbuffer לפני שהוא רואה את הhead החדש.
- הצרכן קורא מהbuffer, ואז עושה release store ל-tail. כך מובטח שהיצרן יראה שהמקום התפנה.
דוגמה שלמה¶
spsc_queue_t queue;
void *producer(void *arg) {
for (int i = 0; i < 1000000; i++) {
while (!spsc_push(&queue, i)) {
// התור מלא, חכה
// בסביבה אמיתית: sched_yield() או pause
}
}
return NULL;
}
void *consumer(void *arg) {
int item;
int count = 0;
long sum = 0;
while (count < 1000000) {
if (spsc_pop(&queue, &item)) {
sum += item;
count++;
}
}
printf("consumed %d items, sum = %ld\n", count, sum);
return NULL;
}
int main(void) {
spsc_init(&queue);
pthread_t prod, cons;
pthread_create(&prod, NULL, producer, NULL);
pthread_create(&cons, NULL, consumer, NULL);
pthread_join(prod, NULL);
pthread_join(cons, NULL);
return 0;
}
תור MPSC - ריבוי יצרנים, צרכן יחיד - multi-producer single-consumer¶
למה MPSC¶
דפוס נפוץ מאוד: כמה תהליכוני עובדים שולחים הודעות לתהליכון מרכזי. למשל:
- מערכת לוגים: כל התהליכונים כותבים, תהליכון אחד כותב לקובץ.
- תור אירועים (event queue): מקורות שונים שולחים אירועים, event loop אחד מעבד אותם.
מימוש עם CAS¶
ההבדל מ-SPSC: עכשיו כמה יצרנים מתחרים על head, אז צריך CAS:
#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
#include <stdbool.h>
typedef struct mpsc_node {
int data;
_Atomic(struct mpsc_node *) next;
} mpsc_node_t;
typedef struct {
_Atomic(mpsc_node_t *) head; // יצרנים מוסיפים כאן
mpsc_node_t *tail; // הצרכן קורא מכאן (לא צריך אטומי)
mpsc_node_t stub; // צומת דמה קבוע
} mpsc_queue_t;
void mpsc_init(mpsc_queue_t *q) {
atomic_store(&q->stub.next, NULL);
atomic_store(&q->head, &q->stub);
q->tail = &q->stub;
}
// יצרן - כמה יצרנים יכולים לקרוא במקביל
void mpsc_push(mpsc_queue_t *q, mpsc_node_t *node) {
atomic_store_explicit(&node->next, NULL, memory_order_relaxed);
// החלף את head לnode החדש, קבל את הhead הישן
mpsc_node_t *prev = atomic_exchange_explicit(&q->head, node,
memory_order_acq_rel);
// חבר את הצומת הקודם לchild החדש
atomic_store_explicit(&prev->next, node, memory_order_release);
}
// צרכן - רק צרכן אחד!
mpsc_node_t *mpsc_pop(mpsc_queue_t *q) {
mpsc_node_t *tail = q->tail;
mpsc_node_t *next = atomic_load_explicit(&tail->next, memory_order_acquire);
if (tail == &q->stub) {
if (next == NULL) {
return NULL; // ריק
}
q->tail = next;
tail = next;
next = atomic_load_explicit(&tail->next, memory_order_acquire);
}
if (next != NULL) {
q->tail = next;
return tail;
}
return NULL; // ריק או יצרן באמצע push
}
רמות התקדמות - progress guarantees¶
הגדרות¶
יש שלוש רמות של ערבויות התקדמות במבני נתונים מקביליים:
ללא נעילה - lock-free:
- לפחות תהליכון אחד מתקדם בכל רגע.
- אם תהליכון אחד נתקע (נכנס לשינה, מת), השאר ממשיכים.
- אין deadlock. אבל תהליכון בודד עלול להירעב (starvation).
- דוגמה: מחסנית Treiber - אם CAS נכשל, זה אומר שתהליכון אחר הצליח.
ללא חסימה - wait-free:
- כל תהליכון מתקדם בזמן חסום (bounded).
- הערבות הכי חזקה: אף תהליכון לא יכול להירעב.
- הרבה יותר קשה לממש. מבני נתונים wait-free מורכבים מאוד.
- דוגמה: תור SPSC - ה-push וה-pop תמיד מסתיימים בזמן חסום.
ללא חסימה חלקי - obstruction-free:
- תהליכון מתקדם אם הוא רץ לבד (בלי מתחרים).
- הערבות הכי חלשה: שני תהליכונים יכולים "לדרוך" אחד על השני שוב ושוב (livelock).
- דוגמה: אלגוריתם שעושה retry אבל בלי backoff - שני תהליכונים עלולים לנסות ולהיכשל שוב ושוב.
סיכום¶
| רמה | ערבות | תהליכון יחיד | ביצועים |
|---|---|---|---|
| ללא חסימה חלקי (obstruction-free) | התקדמות רק בלי מתחרים | כן | נמוכים |
| ללא נעילה (lock-free) | לפחות אחד מתקדם | כן | טובים |
| ללא חסימה (wait-free) | כולם מתקדמים תמיד | כן | תלוי במימוש |
מתי להשתמש בlock-free¶
מתי כן¶
- נתיבים חמים מאוד - קוד שרץ מיליוני פעמים בשנייה ו-mutex contention הוא צוואר בקבוק.
- מערכות זמן אמת - כשאי אפשר להרשות לתהליכון להיחסם לזמן לא ידוע.
- מתחרות גבוהה - כשהרבה תהליכונים מתחרים על אותו משאב ו-mutex גורם לירידת ביצועים.
- חוצצים בין שכבות - תור SPSC בין תהליכון שמקליט אודיו לתהליכון שמעבד אותו.
מתי לא¶
- ברוב המקרים - mutex מודרני (futex בלינוקס) הוא מהיר מאוד כשאין מתחרות. ה-fast path (נעילה בלי מתחרים) הוא פעולה אטומית אחת בuser-space.
- כשהנכונות חשובה יותר מביצועים - מבני נתונים lock-free קשים מאוד לכתיבה נכונה ולדיבוג.
- כשאין מדידה - אל תעברו ל-lock-free בלי למדוד קודם שה-mutex הוא באמת הבעיה.
כלל אצבע: התחילו עם mutex. מדדו. אם יש בעיית contention ספציפית, שקלו lock-free רק לאותו מקום.