לדלג לתוכן

12.5 פרויקט סיכום פרויקט

פרויקט סיכום: בריכת תהליכונים - thread pool library

הגיע הזמן לשלב את מה שלמדנו בפרק 12 לתוך ספרייה אמיתית וברת שימוש חוזר.

בפרויקט הזה תממשו ספריית thread pool בC - בריכת תהליכונים עם תור משימות, כיבוי מסודר, ותמיכה בfutures (תוצאות עתידיות).


שלב 1 - הגדרת הAPI

צרו קובץ header בשם threadpool.h עם הAPI הבא:

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <stddef.h>

// סוגים אטומים - פרטי מימוש
typedef struct threadpool threadpool_t;
typedef struct future future_t;

// סוג פונקציית משימה: מקבלת void* ומחזירה void*
typedef void *(*task_func_t)(void *arg);

// צור בריכת תהליכונים עם num_threads תהליכוני worker
threadpool_t *threadpool_create(int num_threads);

// שלח משימה לביצוע. מחזיר future שאפשר לחכות לו
future_t *threadpool_submit(threadpool_t *pool, task_func_t func, void *arg);

// כיבוי מסודר: חכה שכל המשימות הנוכחיות יסתיימו, ואז עצור workers
void threadpool_destroy(threadpool_t *pool);

// חכה לתוצאה של משימה. חוסם עד שהמשימה מסתיימת
void *future_get(future_t *f);

// שחרר future (אחרי שכבר קראנו את התוצאה)
void future_free(future_t *f);

#endif

שלב 2 - מימוש תור המשימות

צרו קובץ threadpool.c וממשו את תור המשימות.

הרעיון:
- רשימה מקושרת של משימות (כל משימה = פונקציה + ארגומנט + future).
- הגנה על התור עם mutex.
- כשמוסיפים משימה - pthread_cond_signal כדי להעיר worker.
- כשהתור ריק - הworkers ישנים על condition variable.

שלד:

#include "threadpool.h"
#include <stdlib.h>
#include <pthread.h>
#include <stdbool.h>

// משימה בתור
typedef struct task {
    task_func_t func;
    void *arg;
    future_t *future;
    struct task *next;
} task_t;

// future - תוצאה עתידית
struct future {
    void *result;
    bool done;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
};

// בריכת תהליכונים
struct threadpool {
    pthread_t *threads;
    int num_threads;

    task_t *queue_head;
    task_t *queue_tail;
    int queue_size;

    pthread_mutex_t queue_mutex;
    pthread_cond_t queue_cond;

    bool shutdown;
};

static future_t *future_create(void) {
    future_t *f = calloc(1, sizeof(future_t));
    f->done = false;
    f->result = NULL;
    pthread_mutex_init(&f->mutex, NULL);
    pthread_cond_init(&f->cond, NULL);
    return f;
}

static void future_set_result(future_t *f, void *result) {
    pthread_mutex_lock(&f->mutex);
    f->result = result;
    f->done = true;
    pthread_cond_broadcast(&f->cond);
    pthread_mutex_unlock(&f->mutex);
}

void *future_get(future_t *f) {
    pthread_mutex_lock(&f->mutex);
    while (!f->done) {
        pthread_cond_wait(&f->cond, &f->mutex);
    }
    void *result = f->result;
    pthread_mutex_unlock(&f->mutex);
    return result;
}

void future_free(future_t *f) {
    pthread_mutex_destroy(&f->mutex);
    pthread_cond_destroy(&f->cond);
    free(f);
}

// TODO: ממשו את worker_func, threadpool_create, threadpool_submit, threadpool_destroy

שלב 3 - כיבוי מסודר

ממשו threadpool_destroy שעושה:

  1. נעילת הmutex.
  2. הפעלת דגל shutdown = true.
  3. שליחת pthread_cond_broadcast כדי להעיר את כל הworkers.
  4. שחרור הmutex.
  5. קריאת pthread_join לכל worker.
  6. שחרור כל הזכרון.

הworkers צריכים לבדוק: אם shutdown == true ואין יותר משימות בתור - לצאת. אם shutdown == true אבל עדיין יש משימות - לסיים אותן לפני שיוצאים.


שלב 4 - futures

הfuture מאפשר למי ששלח את המשימה לחכות לתוצאה שלה:

// דוגמה
void *compute(void *arg) {
    int *n = (int *)arg;
    int *result = malloc(sizeof(int));
    *result = (*n) * (*n);
    return result;
}

// שלח משימה
int num = 42;
future_t *f = threadpool_submit(pool, compute, &num);

// ... עשה דברים אחרים ...

// קבל תוצאה (חוסם אם המשימה עדיין רצה)
int *result = (int *)future_get(f);
printf("42^2 = %d\n", *result);

free(result);
future_free(f);

המנגנון פשוט: הfuture מכיל mutex + condition variable + דגל done. הworker שמסיים את המשימה קורא ל-future_set_result, ומי שקורא ל-future_get ישן על הcondition variable עד ש-done == true.


שלב 5 - benchmark

כתבו תוכנית בדיקה (test_threadpool.c) שעושה:

  1. צרו thread pool.
  2. שלחו 1,000,000 משימות פשוטות (למשל: חשבו ריבוע של מספר).
  3. חכו לכל התוצאות עם future_get.
  4. מדדו את הזמן הכולל.
  5. הריצו עם 1, 2, 4, 8 תהליכונים והשוו.

שלד לtest:

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include "threadpool.h"

#define NUM_TASKS 1000000

void *square(void *arg) {
    long n = (long)arg;
    long *result = malloc(sizeof(long));
    *result = n * n;
    return result;
}

void benchmark(int num_threads) {
    struct timespec start, end;

    threadpool_t *pool = threadpool_create(num_threads);

    future_t **futures = malloc(sizeof(future_t *) * NUM_TASKS);

    clock_gettime(CLOCK_MONOTONIC, &start);

    for (long i = 0; i < NUM_TASKS; i++) {
        futures[i] = threadpool_submit(pool, square, (void *)i);
    }

    long total = 0;
    for (int i = 0; i < NUM_TASKS; i++) {
        long *result = (long *)future_get(futures[i]);
        total += *result;
        free(result);
        future_free(futures[i]);
    }

    clock_gettime(CLOCK_MONOTONIC, &end);

    double elapsed = (end.tv_sec - start.tv_sec) +
                     (end.tv_nsec - start.tv_nsec) / 1e9;

    printf("%d threads: %.3f seconds (%.0f tasks/sec)\n",
           num_threads, elapsed, NUM_TASKS / elapsed);

    free(futures);
    threadpool_destroy(pool);
}

int main(void) {
    printf("thread pool benchmark - %d tasks\n\n", NUM_TASKS);

    benchmark(1);
    benchmark(2);
    benchmark(4);
    benchmark(8);

    return 0;
}

קומפילציה:

gcc -O2 -o test_threadpool threadpool.c test_threadpool.c -lpthread


מה צריך להגיש

  1. threadpool.h - הAPI (ניתן למעלה).
  2. threadpool.c - המימוש המלא.
  3. test_threadpool.c - תוכנית הבדיקה והbenchmark.
  4. פלט הbenchmark עם 1, 2, 4, 8 תהליכונים.
  5. הסבר קצר (בתוך הערה בקוד): למה הspeedup לא ליניארי? (רמז: תקורת סנכרון, contention על התור).

בונוס

  • הוסיפו threadpool_submit_batch(pool, tasks[], count) שמוסיף הרבה משימות בבת אחת (נעילה אחת במקום count נעילות).
  • הוסיפו "גניבת עבודה" (work stealing) - כל worker מחזיק תור מקומי, וכשהוא ריק הוא גונב מworker אחר.
  • ממשו future_then(future, callback) שרושם callback שירוץ אוטומטית כשהfuture מוכן.