12.1 דפוסי תכנות מקבילי הרצאה
הקדמה¶
בפרק 5.8 למדנו על תהליכונים (threads) ועל mutex כדי להגן על משאבים משותפים. בפרק 6.10 ראינו איך הקרנל עצמו מסנכרן גישה למשאבים. עכשיו הגיע הזמן ללמוד את הדפוסים - patterns - שהופכים תכנות מקבילי ממשהו מפחיד למשהו מובנה ובטוח.
הרעיון פשוט: במקום להמציא כל פעם מחדש את הגלגל, יש דפוסים מוכחים שפותרים בעיות נפוצות. בואו נכיר אותם.
דפוס יצרן-צרכן - producer-consumer¶
הרעיון¶
אחד הדפוסים הכי נפוצים בתכנות מקבילי. הרעיון הוא כזה:
- תהליכון אחד (או יותר) מייצר נתונים - זה היצרן (producer).
- תהליכון אחד (או יותר) צורך את הנתונים - זה הצרכן (consumer).
- ביניהם יש חוצץ משותף - תור (queue) בגודל מוגבל.
דוגמאות מהעולם האמיתי:
- שרת וב: תהליכון מקבל חיבורים חדשים (יצרן) ותהליכונים אחרים מטפלים בבקשות (צרכנים).
- עיבוד לוגים: תהליכון קורא שורות מקובץ (יצרן) ותהליכונים אחרים מנתחים אותן (צרכנים).
- צינור עיבוד: כל שלב הוא גם צרכן (של השלב הקודם) וגם יצרן (של השלב הבא).
הבעיה¶
נניח שיש לנו תור משותף. צריך לפתור שתי בעיות:
1. סנכרון גישה - שני תהליכונים לא יכולים לגעת בתור באותו רגע (כבר מוכר לנו - mutex).
2. תיאום - מה קורה כשהתור מלא? היצרן צריך לחכות. מה קורה כשהתור ריק? הצרכן צריך לחכות.
אפשר לפתור את בעיה 2 עם busy-waiting - לולאה שבודקת שוב ושוב אם התור התפנה. אבל זה בזבוז מטורף של CPU. צריך מנגנון שנותן לתהליכון לישון עד שמשהו משתנה.
משתני תנאי - condition variables¶
למה mutex לבד לא מספיק¶
כשתהליכון צריך לחכות עד שתנאי מסוים מתקיים (למשל "התור כבר לא ריק"), אנחנו צריכים שהוא ילך לישון ויתעורר כשהתנאי מתקיים. mutex לבד לא עושה את זה - הוא רק מגן על גישה, לא מאותת על שינוי מצב.
הרעיון¶
משתנה תנאי (condition variable) נותן לנו שתי פעולות:
- המתנה (pthread_cond_wait) - שחרר את הmutex, לך לישון, וכשמישהו יעיר אותך - תחזור להחזיק את הmutex.
- איתות (pthread_cond_signal / pthread_cond_broadcast) - העיר תהליכון אחד (signal) או את כולם (broadcast) שישנים על התנאי הזה.
הAPI¶
#include <pthread.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// המתנה - חייבים להחזיק את הmutex לפני הקריאה!
pthread_cond_wait(&cond, &mutex);
// העירו תהליכון אחד
pthread_cond_signal(&cond);
// העירו את כל התהליכונים
pthread_cond_broadcast(&cond);
נקודה קריטית - לולאת while¶
חובה לבדוק את התנאי בתוך לולאת while ולא ב-if. למה? כי יכולות לקרות "התעוררויות מדומות" (spurious wakeups) - מערכת ההפעלה יכולה להעיר תהליכון גם בלי שקראו ל-signal. בנוסף, אם יש כמה צרכנים ורק signal אחד, יכול להיות שתהליכון אחר כבר תפס את האלמנט לפני שהתעוררנו.
// נכון:
pthread_mutex_lock(&mutex);
while (queue_is_empty()) { // while - לא if!
pthread_cond_wait(&cond, &mutex);
}
// עכשיו התור בטוח לא ריק
item = dequeue();
pthread_mutex_unlock(&mutex);
// לא נכון:
pthread_mutex_lock(&mutex);
if (queue_is_empty()) { // באג! spurious wakeup יגרום לבעיה
pthread_cond_wait(&cond, &mutex);
}
item = dequeue(); // קריסה אפשרית - התור עלול להיות ריק
pthread_mutex_unlock(&mutex);
מימוש תור מוגבל בטוח לתהליכונים - thread-safe bounded queue¶
בואו נממש תור שמתאים לדפוס יצרן-צרכן. התור מבוסס על מערך מעגלי (ring buffer):
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdbool.h>
#define QUEUE_CAPACITY 16
typedef struct {
int items[QUEUE_CAPACITY];
int head; // מאיפה קוראים
int tail; // לאיפה כותבים
int count; // כמה פריטים יש
pthread_mutex_t mutex;
pthread_cond_t not_full; // מאותת כשהתור כבר לא מלא
pthread_cond_t not_empty; // מאותת כשהתור כבר לא ריק
} bounded_queue_t;
void queue_init(bounded_queue_t *q) {
q->head = 0;
q->tail = 0;
q->count = 0;
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->not_full, NULL);
pthread_cond_init(&q->not_empty, NULL);
}
void queue_destroy(bounded_queue_t *q) {
pthread_mutex_destroy(&q->mutex);
pthread_cond_destroy(&q->not_full);
pthread_cond_destroy(&q->not_empty);
}
// חוסם אם התור מלא
void queue_push(bounded_queue_t *q, int item) {
pthread_mutex_lock(&q->mutex);
while (q->count == QUEUE_CAPACITY) {
// התור מלא - נחכה עד שמישהו יוציא פריט
pthread_cond_wait(&q->not_full, &q->mutex);
}
q->items[q->tail] = item;
q->tail = (q->tail + 1) % QUEUE_CAPACITY;
q->count++;
// אותת לצרכנים שיש מה לקחת
pthread_cond_signal(&q->not_empty);
pthread_mutex_unlock(&q->mutex);
}
// חוסם אם התור ריק
int queue_pop(bounded_queue_t *q) {
pthread_mutex_lock(&q->mutex);
while (q->count == 0) {
// התור ריק - נחכה עד שמישהו יכניס פריט
pthread_cond_wait(&q->not_empty, &q->mutex);
}
int item = q->items[q->head];
q->head = (q->head + 1) % QUEUE_CAPACITY;
q->count--;
// אותת ליצרנים שיש מקום
pthread_cond_signal(&q->not_full);
pthread_mutex_unlock(&q->mutex);
return item;
}
דוגמה שלמה - יצרנים וצרכנים¶
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
// (הגדרת bounded_queue_t מלמעלה)
bounded_queue_t queue;
void *producer(void *arg) {
int id = *(int *)arg;
for (int i = 0; i < 10; i++) {
int item = id * 100 + i;
queue_push(&queue, item);
printf("producer %d: pushed %d\n", id, item);
}
return NULL;
}
void *consumer(void *arg) {
int id = *(int *)arg;
for (int i = 0; i < 10; i++) {
int item = queue_pop(&queue);
printf("consumer %d: popped %d\n", id, item);
}
return NULL;
}
int main(void) {
queue_init(&queue);
// 2 יצרנים, 2 צרכנים - כל אחד מעבד 10 פריטים
pthread_t producers[2], consumers[2];
int ids[] = {0, 1};
for (int i = 0; i < 2; i++) {
pthread_create(&producers[i], NULL, producer, &ids[i]);
pthread_create(&consumers[i], NULL, consumer, &ids[i]);
}
for (int i = 0; i < 2; i++) {
pthread_join(producers[i], NULL);
pthread_join(consumers[i], NULL);
}
queue_destroy(&queue);
printf("done.\n");
return 0;
}
קומפילציה:
דפוס בריכת תהליכונים - thread pool¶
הרעיון¶
יצירת תהליכון היא פעולה יקרה (syscall, הקצאת מחסנית, ניהול בקרנל). אם יש לנו שרת שמטפל באלפי בקשות בשנייה, לא נרצה ליצור ולהרוס תהליכון לכל בקשה.
הפתרון: ניצור N תהליכונים מראש (worker threads), ונחזיק תור של משימות. כל worker לוקח משימה מהתור, מבצע אותה, וחוזר לקחת את הבאה.
ככה עובדים:
- שרתי וב (Apache, Nginx worker pool)
- מנועי מסדי נתונים (PostgreSQL, MySQL)
- מערכות הפעלה (thread pool של הקרנל)
מימוש¶
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdbool.h>
// משימה - מצביע לפונקציה עם ארגומנט
typedef struct task {
void (*function)(void *arg);
void *arg;
struct task *next;
} task_t;
// בריכת תהליכונים
typedef struct {
pthread_t *threads;
int num_threads;
task_t *task_head; // ראש תור המשימות
task_t *task_tail; // זנב תור המשימות
int task_count;
pthread_mutex_t mutex;
pthread_cond_t task_available; // מאותת כשיש משימה חדשה
bool shutdown; // דגל כיבוי
} threadpool_t;
// הפונקציה שכל worker מריץ
static void *worker_func(void *arg) {
threadpool_t *pool = (threadpool_t *)arg;
while (true) {
pthread_mutex_lock(&pool->mutex);
// חכה למשימה או לכיבוי
while (pool->task_count == 0 && !pool->shutdown) {
pthread_cond_wait(&pool->task_available, &pool->mutex);
}
// אם ביקשו כיבוי ואין יותר משימות - צא
if (pool->shutdown && pool->task_count == 0) {
pthread_mutex_unlock(&pool->mutex);
return NULL;
}
// קח משימה מהתור
task_t *task = pool->task_head;
pool->task_head = task->next;
if (pool->task_head == NULL) {
pool->task_tail = NULL;
}
pool->task_count--;
pthread_mutex_unlock(&pool->mutex);
// הרץ את המשימה (בלי להחזיק את הmutex!)
task->function(task->arg);
free(task);
}
return NULL;
}
threadpool_t *threadpool_create(int num_threads) {
threadpool_t *pool = calloc(1, sizeof(threadpool_t));
pool->num_threads = num_threads;
pool->threads = malloc(sizeof(pthread_t) * num_threads);
pool->shutdown = false;
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->task_available, NULL);
for (int i = 0; i < num_threads; i++) {
pthread_create(&pool->threads[i], NULL, worker_func, pool);
}
return pool;
}
void threadpool_submit(threadpool_t *pool, void (*func)(void *), void *arg) {
task_t *task = malloc(sizeof(task_t));
task->function = func;
task->arg = arg;
task->next = NULL;
pthread_mutex_lock(&pool->mutex);
if (pool->task_tail) {
pool->task_tail->next = task;
} else {
pool->task_head = task;
}
pool->task_tail = task;
pool->task_count++;
pthread_cond_signal(&pool->task_available);
pthread_mutex_unlock(&pool->mutex);
}
void threadpool_destroy(threadpool_t *pool) {
pthread_mutex_lock(&pool->mutex);
pool->shutdown = true;
pthread_cond_broadcast(&pool->task_available); // העירו את כולם
pthread_mutex_unlock(&pool->mutex);
for (int i = 0; i < pool->num_threads; i++) {
pthread_join(pool->threads[i], NULL);
}
free(pool->threads);
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->task_available);
free(pool);
}
שימוש בthread pool¶
void print_task(void *arg) {
int *num = (int *)arg;
printf("task: processing number %d (thread %lu)\n", *num, pthread_self());
free(num);
}
int main(void) {
threadpool_t *pool = threadpool_create(4);
// שלחו 20 משימות
for (int i = 0; i < 20; i++) {
int *num = malloc(sizeof(int));
*num = i;
threadpool_submit(pool, print_task, num);
}
sleep(1); // תנו זמן למשימות לרוץ
threadpool_destroy(pool);
return 0;
}
שימו לב: ב-threadpool_destroy אנחנו שולחים broadcast (לא signal) כי צריך להעיר את כל הworkers כדי שיראו את דגל הכיבוי.
דפוס נעילת קורא-כותב - reader-writer lock¶
הרעיון¶
לפעמים יש לנו מבנה נתונים שהרבה תהליכונים קוראים ממנו, אבל מעטים כותבים אליו. אם נשתמש ב-mutex רגיל, רק תהליכון אחד יוכל לקרוא בכל רגע - בזבוז! הרי קריאה בלבד לא מסוכנת.
הפתרון - reader-writer lock:
- הרבה קוראים יכולים להחזיק את הנעילה במקביל.
- כותב אחד מחזיק את הנעילה בבלעדיות (אף אחד אחר, לא קורא ולא כותב).
הAPI¶
#include <pthread.h>
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
// נעילה לקריאה - כמה תהליכונים יכולים להחזיק אותה במקביל
pthread_rwlock_rdlock(&rwlock);
// ... קריאה מהמבנה ...
pthread_rwlock_unlock(&rwlock);
// נעילה לכתיבה - בלעדית
pthread_rwlock_wrlock(&rwlock);
// ... כתיבה למבנה ...
pthread_rwlock_unlock(&rwlock);
דוגמה - מטמון (cache) משותף¶
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#define CACHE_SIZE 100
typedef struct {
char key[64];
int value;
} cache_entry_t;
typedef struct {
cache_entry_t entries[CACHE_SIZE];
int count;
pthread_rwlock_t lock;
} cache_t;
void cache_init(cache_t *c) {
c->count = 0;
pthread_rwlock_init(&c->lock, NULL);
}
// קריאה - הרבה תהליכונים יכולים לקרוא במקביל
int cache_get(cache_t *c, const char *key, int *value) {
pthread_rwlock_rdlock(&c->lock);
for (int i = 0; i < c->count; i++) {
if (strcmp(c->entries[i].key, key) == 0) {
*value = c->entries[i].value;
pthread_rwlock_unlock(&c->lock);
return 1; // נמצא
}
}
pthread_rwlock_unlock(&c->lock);
return 0; // לא נמצא
}
// כתיבה - בלעדית
void cache_set(cache_t *c, const char *key, int value) {
pthread_rwlock_wrlock(&c->lock);
// בדוק אם המפתח כבר קיים
for (int i = 0; i < c->count; i++) {
if (strcmp(c->entries[i].key, key) == 0) {
c->entries[i].value = value;
pthread_rwlock_unlock(&c->lock);
return;
}
}
// מפתח חדש
if (c->count < CACHE_SIZE) {
strncpy(c->entries[c->count].key, key, 63);
c->entries[c->count].key[63] = '\0';
c->entries[c->count].value = value;
c->count++;
}
pthread_rwlock_unlock(&c->lock);
}
העדפת קוראים מול העדפת כותבים - fairness¶
יש בעיה עם reader-writer lock: מה קורה כשיש כל הזמן קוראים חדשים? הכותב עלול לא לקבל לעולם את הנעילה! זה נקרא "הרעבה" (starvation).
שתי אסטרטגיות:
- העדפת קוראים (reader-preference) - קוראים חדשים תמיד נכנסים אם יש כבר קוראים. כותבים עלולים להירעב.
- העדפת כותבים (writer-preference) - כשכותב מחכה, קוראים חדשים מחכים גם. מונע הרעבת כותבים.
בלינוקס אפשר להגדיר את ההעדפה עם attributes:
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
pthread_rwlock_init(&rwlock, &attr);
דפוס המחסום - barrier¶
הרעיון¶
לפעמים יש לנו חישוב מקבילי שמתחלק לשלבים. כל התהליכונים חייבים לסיים את שלב 1 לפני שמישהו מתחיל את שלב 2. הדפוס הזה נקרא barrier - מחסום שכל התהליכונים צריכים להגיע אליו לפני שמישהו ממשיך.
דוגמאות:
- חישוב מטריצות: כל תהליכון מחשב חלק מהשורות, ואז כולם ממשיכים לשלב הבא.
- סימולציות פיזיקה: כל תהליכון מעדכן חלק מהחלקיקים, ואז כולם ממשיכים לצעד הבא.
הAPI¶
#include <pthread.h>
pthread_barrier_t barrier;
// אתחול - count = מספר התהליכונים שצריכים להגיע למחסום
pthread_barrier_init(&barrier, NULL, count);
// המתנה - חוסם עד שכל count התהליכונים מגיעים
pthread_barrier_wait(&barrier);
// שחרור
pthread_barrier_destroy(&barrier);
דוגמה - חישוב מקבילי בשלבים¶
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREADS 4
#define NUM_PHASES 3
#define ARRAY_SIZE 1000
int data[ARRAY_SIZE];
pthread_barrier_t barrier;
void *worker(void *arg) {
int id = *(int *)arg;
int start = id * (ARRAY_SIZE / NUM_THREADS);
int end = start + (ARRAY_SIZE / NUM_THREADS);
for (int phase = 0; phase < NUM_PHASES; phase++) {
// כל תהליכון מעבד את החלק שלו
for (int i = start; i < end; i++) {
data[i] += phase + 1;
}
printf("thread %d: finished phase %d\n", id, phase);
// כולם חייבים לסיים את השלב לפני שממשיכים
pthread_barrier_wait(&barrier);
}
return NULL;
}
int main(void) {
pthread_t threads[NUM_THREADS];
int ids[NUM_THREADS];
pthread_barrier_init(&barrier, NULL, NUM_THREADS);
for (int i = 0; i < NUM_THREADS; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, worker, &ids[i]);
}
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
pthread_barrier_destroy(&barrier);
printf("all phases complete.\n");
return 0;
}
דפוס המוניטור - monitor¶
הרעיון¶
כל הדפוסים שראינו משתמשים ב-mutex ו-condition variables. הדפוס של מוניטור (monitor) אומר: נעטוף את כל הסנכרון בתוך מבנה נתונים, כך שהמשתמש לא צריך לדעת מזה.
במילים אחרות: מוניטור = מבנה נתונים + mutex + condition variables, כאשר כל הפעולות על המבנה כבר כוללות את הנעילה בפנים.
למעשה, התור המוגבל שמימשנו למעלה הוא דוגמה מצוינת למוניטור! המשתמש קורא ל-queue_push ו-queue_pop ולא צריך לדעת מהmutex וה-condition variables.
עקרונות¶
- כל המצב הפנימי פרטי - המשתמש לא ניגש ישירות לשדות.
- כל פעולה ציבורית נועלת ומשחררת - המשתמש לא צריך לנעול בעצמו.
- condition variables לתיאום - כשפעולה לא יכולה להתבצע, התהליכון ישן ומתעורר כשאפשר.
דוגמה נוספת - מונה בטוח (safe counter) שמחכה עד שהמונה מגיע לערך מסוים:
typedef struct {
int value;
pthread_mutex_t mutex;
pthread_cond_t value_changed;
} monitor_counter_t;
void counter_init(monitor_counter_t *mc, int initial) {
mc->value = initial;
pthread_mutex_init(&mc->mutex, NULL);
pthread_cond_init(&mc->value_changed, NULL);
}
void counter_increment(monitor_counter_t *mc) {
pthread_mutex_lock(&mc->mutex);
mc->value++;
pthread_cond_broadcast(&mc->value_changed); // אותת לכל מי שמחכה
pthread_mutex_unlock(&mc->mutex);
}
// חוסם עד שהמונה מגיע לtarget
void counter_wait_until(monitor_counter_t *mc, int target) {
pthread_mutex_lock(&mc->mutex);
while (mc->value < target) {
pthread_cond_wait(&mc->value_changed, &mc->mutex);
}
pthread_mutex_unlock(&mc->mutex);
}
סיכום¶
| דפוס | בעיה שהוא פותר | מנגנונים |
|---|---|---|
| יצרן-צרכן (producer-consumer) | העברת נתונים בין תהליכונים | תור + mutex + condition variables |
| בריכת תהליכונים (thread pool) | עלות יצירת תהליכונים | תהליכונים קבועים + תור משימות |
| קורא-כותב (reader-writer) | קריאות מקביליות, כתיבה בלעדית | rwlock |
| מחסום (barrier) | סנכרון שלבים | barrier |
| מוניטור (monitor) | עטיפת סנכרון במבנה נתונים | mutex + condition variables |
כל הדפוסים האלה בנויים על אותם בלוקים בסיסיים שכבר מכירים: mutex, condition variables, ופעולות אטומיות. מה שמשתנה זה האופן שבו אנחנו מרכיבים אותם כדי לפתור בעיות שונות.