12.5 פרויקט סיכום פרויקט
פרויקט סיכום: בריכת תהליכונים - thread pool library¶
הגיע הזמן לשלב את מה שלמדנו בפרק 12 לתוך ספרייה אמיתית וברת שימוש חוזר.
בפרויקט הזה תממשו ספריית thread pool בC - בריכת תהליכונים עם תור משימות, כיבוי מסודר, ותמיכה בfutures (תוצאות עתידיות).
שלב 1 - הגדרת הAPI¶
צרו קובץ header בשם threadpool.h עם הAPI הבא:
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <stddef.h>
// סוגים אטומים - פרטי מימוש
typedef struct threadpool threadpool_t;
typedef struct future future_t;
// סוג פונקציית משימה: מקבלת void* ומחזירה void*
typedef void *(*task_func_t)(void *arg);
// צור בריכת תהליכונים עם num_threads תהליכוני worker
threadpool_t *threadpool_create(int num_threads);
// שלח משימה לביצוע. מחזיר future שאפשר לחכות לו
future_t *threadpool_submit(threadpool_t *pool, task_func_t func, void *arg);
// כיבוי מסודר: חכה שכל המשימות הנוכחיות יסתיימו, ואז עצור workers
void threadpool_destroy(threadpool_t *pool);
// חכה לתוצאה של משימה. חוסם עד שהמשימה מסתיימת
void *future_get(future_t *f);
// שחרר future (אחרי שכבר קראנו את התוצאה)
void future_free(future_t *f);
#endif
שלב 2 - מימוש תור המשימות¶
צרו קובץ threadpool.c וממשו את תור המשימות.
הרעיון:
- רשימה מקושרת של משימות (כל משימה = פונקציה + ארגומנט + future).
- הגנה על התור עם mutex.
- כשמוסיפים משימה - pthread_cond_signal כדי להעיר worker.
- כשהתור ריק - הworkers ישנים על condition variable.
שלד:
#include "threadpool.h"
#include <stdlib.h>
#include <pthread.h>
#include <stdbool.h>
// משימה בתור
typedef struct task {
task_func_t func;
void *arg;
future_t *future;
struct task *next;
} task_t;
// future - תוצאה עתידית
struct future {
void *result;
bool done;
pthread_mutex_t mutex;
pthread_cond_t cond;
};
// בריכת תהליכונים
struct threadpool {
pthread_t *threads;
int num_threads;
task_t *queue_head;
task_t *queue_tail;
int queue_size;
pthread_mutex_t queue_mutex;
pthread_cond_t queue_cond;
bool shutdown;
};
static future_t *future_create(void) {
future_t *f = calloc(1, sizeof(future_t));
f->done = false;
f->result = NULL;
pthread_mutex_init(&f->mutex, NULL);
pthread_cond_init(&f->cond, NULL);
return f;
}
static void future_set_result(future_t *f, void *result) {
pthread_mutex_lock(&f->mutex);
f->result = result;
f->done = true;
pthread_cond_broadcast(&f->cond);
pthread_mutex_unlock(&f->mutex);
}
void *future_get(future_t *f) {
pthread_mutex_lock(&f->mutex);
while (!f->done) {
pthread_cond_wait(&f->cond, &f->mutex);
}
void *result = f->result;
pthread_mutex_unlock(&f->mutex);
return result;
}
void future_free(future_t *f) {
pthread_mutex_destroy(&f->mutex);
pthread_cond_destroy(&f->cond);
free(f);
}
// TODO: ממשו את worker_func, threadpool_create, threadpool_submit, threadpool_destroy
שלב 3 - כיבוי מסודר¶
ממשו threadpool_destroy שעושה:
- נעילת הmutex.
- הפעלת דגל
shutdown = true. - שליחת
pthread_cond_broadcastכדי להעיר את כל הworkers. - שחרור הmutex.
- קריאת
pthread_joinלכל worker. - שחרור כל הזכרון.
הworkers צריכים לבדוק: אם shutdown == true ואין יותר משימות בתור - לצאת. אם shutdown == true אבל עדיין יש משימות - לסיים אותן לפני שיוצאים.
שלב 4 - futures¶
הfuture מאפשר למי ששלח את המשימה לחכות לתוצאה שלה:
// דוגמה
void *compute(void *arg) {
int *n = (int *)arg;
int *result = malloc(sizeof(int));
*result = (*n) * (*n);
return result;
}
// שלח משימה
int num = 42;
future_t *f = threadpool_submit(pool, compute, &num);
// ... עשה דברים אחרים ...
// קבל תוצאה (חוסם אם המשימה עדיין רצה)
int *result = (int *)future_get(f);
printf("42^2 = %d\n", *result);
free(result);
future_free(f);
המנגנון פשוט: הfuture מכיל mutex + condition variable + דגל done. הworker שמסיים את המשימה קורא ל-future_set_result, ומי שקורא ל-future_get ישן על הcondition variable עד ש-done == true.
שלב 5 - benchmark¶
כתבו תוכנית בדיקה (test_threadpool.c) שעושה:
- צרו thread pool.
- שלחו 1,000,000 משימות פשוטות (למשל: חשבו ריבוע של מספר).
- חכו לכל התוצאות עם
future_get. - מדדו את הזמן הכולל.
- הריצו עם 1, 2, 4, 8 תהליכונים והשוו.
שלד לtest:
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "threadpool.h"
#define NUM_TASKS 1000000
void *square(void *arg) {
long n = (long)arg;
long *result = malloc(sizeof(long));
*result = n * n;
return result;
}
void benchmark(int num_threads) {
struct timespec start, end;
threadpool_t *pool = threadpool_create(num_threads);
future_t **futures = malloc(sizeof(future_t *) * NUM_TASKS);
clock_gettime(CLOCK_MONOTONIC, &start);
for (long i = 0; i < NUM_TASKS; i++) {
futures[i] = threadpool_submit(pool, square, (void *)i);
}
long total = 0;
for (int i = 0; i < NUM_TASKS; i++) {
long *result = (long *)future_get(futures[i]);
total += *result;
free(result);
future_free(futures[i]);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double elapsed = (end.tv_sec - start.tv_sec) +
(end.tv_nsec - start.tv_nsec) / 1e9;
printf("%d threads: %.3f seconds (%.0f tasks/sec)\n",
num_threads, elapsed, NUM_TASKS / elapsed);
free(futures);
threadpool_destroy(pool);
}
int main(void) {
printf("thread pool benchmark - %d tasks\n\n", NUM_TASKS);
benchmark(1);
benchmark(2);
benchmark(4);
benchmark(8);
return 0;
}
קומפילציה:
מה צריך להגיש¶
threadpool.h- הAPI (ניתן למעלה).threadpool.c- המימוש המלא.test_threadpool.c- תוכנית הבדיקה והbenchmark.- פלט הbenchmark עם 1, 2, 4, 8 תהליכונים.
- הסבר קצר (בתוך הערה בקוד): למה הspeedup לא ליניארי? (רמז: תקורת סנכרון, contention על התור).
בונוס¶
- הוסיפו
threadpool_submit_batch(pool, tasks[], count)שמוסיף הרבה משימות בבת אחת (נעילה אחת במקום count נעילות). - הוסיפו "גניבת עבודה" (work stealing) - כל worker מחזיק תור מקומי, וכשהוא ריק הוא גונב מworker אחר.
- ממשו
future_then(future, callback)שרושם callback שירוץ אוטומטית כשהfuture מוכן.