לדלג לתוכן

12.1 דפוסי תכנות מקבילי פתרון

פתרון לתרגיל 1 - יצרן-צרכן בסיסי

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define QUEUE_CAPACITY 8
#define NUM_PRODUCERS 3
#define NUM_CONSUMERS 2
#define ITEMS_PER_PRODUCER 20
#define POISON_PILL -1

typedef struct {
    int items[QUEUE_CAPACITY];
    int head, tail, count;
    pthread_mutex_t mutex;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
} bounded_queue_t;

void queue_init(bounded_queue_t *q) {
    q->head = q->tail = q->count = 0;
    pthread_mutex_init(&q->mutex, NULL);
    pthread_cond_init(&q->not_full, NULL);
    pthread_cond_init(&q->not_empty, NULL);
}

void queue_push(bounded_queue_t *q, int item) {
    pthread_mutex_lock(&q->mutex);
    while (q->count == QUEUE_CAPACITY) {
        pthread_cond_wait(&q->not_full, &q->mutex);
    }
    q->items[q->tail] = item;
    q->tail = (q->tail + 1) % QUEUE_CAPACITY;
    q->count++;
    pthread_cond_signal(&q->not_empty);
    pthread_mutex_unlock(&q->mutex);
}

int queue_pop(bounded_queue_t *q) {
    pthread_mutex_lock(&q->mutex);
    while (q->count == 0) {
        pthread_cond_wait(&q->not_empty, &q->mutex);
    }
    int item = q->items[q->head];
    q->head = (q->head + 1) % QUEUE_CAPACITY;
    q->count--;
    pthread_cond_signal(&q->not_full);
    pthread_mutex_unlock(&q->mutex);
    return item;
}

void queue_destroy(bounded_queue_t *q) {
    pthread_mutex_destroy(&q->mutex);
    pthread_cond_destroy(&q->not_full);
    pthread_cond_destroy(&q->not_empty);
}

bounded_queue_t queue;

void *producer(void *arg) {
    int id = *(int *)arg;
    for (int i = 0; i < ITEMS_PER_PRODUCER; i++) {
        int item = id * 1000 + i;
        queue_push(&queue, item);
        printf("producer %d: pushed %d\n", id, item);
    }
    return NULL;
}

void *consumer(void *arg) {
    int id = *(int *)arg;
    long sum = 0;
    int count = 0;

    while (1) {
        int item = queue_pop(&queue);
        if (item == POISON_PILL) {
            break;
        }
        sum += item;
        count++;
    }

    printf("consumer %d: consumed %d items, sum = %ld\n", id, count, sum);
    return NULL;
}

int main(void) {
    queue_init(&queue);

    pthread_t producers[NUM_PRODUCERS];
    pthread_t consumers[NUM_CONSUMERS];
    int producer_ids[NUM_PRODUCERS];
    int consumer_ids[NUM_CONSUMERS];

    for (int i = 0; i < NUM_PRODUCERS; i++) {
        producer_ids[i] = i;
        pthread_create(&producers[i], NULL, producer, &producer_ids[i]);
    }

    for (int i = 0; i < NUM_CONSUMERS; i++) {
        consumer_ids[i] = i;
        pthread_create(&consumers[i], NULL, consumer, &consumer_ids[i]);
    }

    // חכה שכל היצרנים יסיימו
    for (int i = 0; i < NUM_PRODUCERS; i++) {
        pthread_join(producers[i], NULL);
    }

    // שלח poison pill לכל צרכן
    for (int i = 0; i < NUM_CONSUMERS; i++) {
        queue_push(&queue, POISON_PILL);
    }

    // חכה שכל הצרכנים יסיימו
    for (int i = 0; i < NUM_CONSUMERS; i++) {
        pthread_join(consumers[i], NULL);
    }

    queue_destroy(&queue);
    printf("done.\n");
    return 0;
}

פתרון לתרגיל 2 - בריכת תהליכונים

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdbool.h>
#include <unistd.h>

typedef struct task {
    void (*function)(void *arg);
    void *arg;
    struct task *next;
} task_t;

typedef struct {
    pthread_t *threads;
    int num_threads;
    task_t *task_head;
    task_t *task_tail;
    int task_count;
    pthread_mutex_t mutex;
    pthread_cond_t task_available;
    bool shutdown;
} threadpool_t;

static void *worker_func(void *arg) {
    threadpool_t *pool = (threadpool_t *)arg;

    while (true) {
        pthread_mutex_lock(&pool->mutex);

        while (pool->task_count == 0 && !pool->shutdown) {
            pthread_cond_wait(&pool->task_available, &pool->mutex);
        }

        if (pool->shutdown && pool->task_count == 0) {
            pthread_mutex_unlock(&pool->mutex);
            return NULL;
        }

        task_t *task = pool->task_head;
        pool->task_head = task->next;
        if (pool->task_head == NULL) {
            pool->task_tail = NULL;
        }
        pool->task_count--;

        pthread_mutex_unlock(&pool->mutex);

        task->function(task->arg);
        free(task);
    }

    return NULL;
}

threadpool_t *threadpool_create(int num_threads) {
    threadpool_t *pool = calloc(1, sizeof(threadpool_t));
    pool->num_threads = num_threads;
    pool->threads = malloc(sizeof(pthread_t) * num_threads);
    pool->shutdown = false;
    pthread_mutex_init(&pool->mutex, NULL);
    pthread_cond_init(&pool->task_available, NULL);

    for (int i = 0; i < num_threads; i++) {
        pthread_create(&pool->threads[i], NULL, worker_func, pool);
    }
    return pool;
}

void threadpool_submit(threadpool_t *pool, void (*func)(void *), void *arg) {
    task_t *task = malloc(sizeof(task_t));
    task->function = func;
    task->arg = arg;
    task->next = NULL;

    pthread_mutex_lock(&pool->mutex);
    if (pool->task_tail) {
        pool->task_tail->next = task;
    } else {
        pool->task_head = task;
    }
    pool->task_tail = task;
    pool->task_count++;
    pthread_cond_signal(&pool->task_available);
    pthread_mutex_unlock(&pool->mutex);
}

void threadpool_destroy(threadpool_t *pool) {
    pthread_mutex_lock(&pool->mutex);
    pool->shutdown = true;
    pthread_cond_broadcast(&pool->task_available);
    pthread_mutex_unlock(&pool->mutex);

    for (int i = 0; i < pool->num_threads; i++) {
        pthread_join(pool->threads[i], NULL);
    }
    free(pool->threads);
    pthread_mutex_destroy(&pool->mutex);
    pthread_cond_destroy(&pool->task_available);
    free(pool);
}

void square_task(void *arg) {
    int *num = (int *)arg;
    int result = (*num) * (*num);
    printf("thread %lu: %d^2 = %d\n", (unsigned long)pthread_self(), *num, result);
    free(num);
}

int main(void) {
    threadpool_t *pool = threadpool_create(4);

    for (int i = 0; i < 100; i++) {
        int *num = malloc(sizeof(int));
        *num = i;
        threadpool_submit(pool, square_task, num);
    }

    sleep(2);
    threadpool_destroy(pool);
    printf("done.\n");
    return 0;
}

פתרון לתרגיל 3 - מטמון עם reader-writer lock

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>

#define CACHE_SIZE 256
#define NUM_READERS 8
#define NUM_WRITERS 2
#define READS_PER_THREAD 1000
#define WRITES_PER_THREAD 100

typedef struct {
    char key[32];
    char value[64];
    int used;
} cache_entry_t;

typedef struct {
    cache_entry_t entries[CACHE_SIZE];
    int count;
    pthread_rwlock_t rwlock;
} cache_t;

void cache_init(cache_t *c) {
    memset(c->entries, 0, sizeof(c->entries));
    c->count = 0;
    pthread_rwlock_init(&c->rwlock, NULL);
}

int cache_get(cache_t *c, const char *key, char *value_out) {
    pthread_rwlock_rdlock(&c->rwlock);
    for (int i = 0; i < c->count; i++) {
        if (c->entries[i].used && strcmp(c->entries[i].key, key) == 0) {
            strcpy(value_out, c->entries[i].value);
            pthread_rwlock_unlock(&c->rwlock);
            return 1;
        }
    }
    pthread_rwlock_unlock(&c->rwlock);
    return 0;
}

void cache_set(cache_t *c, const char *key, const char *value) {
    pthread_rwlock_wrlock(&c->rwlock);
    // בדוק אם קיים
    for (int i = 0; i < c->count; i++) {
        if (c->entries[i].used && strcmp(c->entries[i].key, key) == 0) {
            strncpy(c->entries[i].value, value, 63);
            c->entries[i].value[63] = '\0';
            pthread_rwlock_unlock(&c->rwlock);
            return;
        }
    }
    // הוסף חדש
    if (c->count < CACHE_SIZE) {
        strncpy(c->entries[c->count].key, key, 31);
        c->entries[c->count].key[31] = '\0';
        strncpy(c->entries[c->count].value, value, 63);
        c->entries[c->count].value[63] = '\0';
        c->entries[c->count].used = 1;
        c->count++;
    }
    pthread_rwlock_unlock(&c->rwlock);
}

void cache_destroy(cache_t *c) {
    pthread_rwlock_destroy(&c->rwlock);
}

cache_t cache;

void *reader_func(void *arg) {
    int id = *(int *)arg;
    int hits = 0;
    char value[64];

    for (int i = 0; i < READS_PER_THREAD; i++) {
        char key[32];
        snprintf(key, sizeof(key), "key_%d", rand() % (WRITES_PER_THREAD * NUM_WRITERS));
        if (cache_get(&cache, key, value)) {
            hits++;
        }
    }

    printf("reader %d: %d hits out of %d lookups\n", id, hits, READS_PER_THREAD);
    return NULL;
}

void *writer_func(void *arg) {
    int id = *(int *)arg;

    for (int i = 0; i < WRITES_PER_THREAD; i++) {
        char key[32], value[64];
        snprintf(key, sizeof(key), "key_%d", id * WRITES_PER_THREAD + i);
        snprintf(value, sizeof(value), "value_from_writer_%d_%d", id, i);
        cache_set(&cache, key, value);
    }

    printf("writer %d: wrote %d entries\n", id, WRITES_PER_THREAD);
    return NULL;
}

int main(void) {
    srand(time(NULL));
    cache_init(&cache);

    struct timespec start, end;
    clock_gettime(CLOCK_MONOTONIC, &start);

    pthread_t readers[NUM_READERS], writers[NUM_WRITERS];
    int reader_ids[NUM_READERS], writer_ids[NUM_WRITERS];

    for (int i = 0; i < NUM_WRITERS; i++) {
        writer_ids[i] = i;
        pthread_create(&writers[i], NULL, writer_func, &writer_ids[i]);
    }

    for (int i = 0; i < NUM_READERS; i++) {
        reader_ids[i] = i;
        pthread_create(&readers[i], NULL, reader_func, &reader_ids[i]);
    }

    for (int i = 0; i < NUM_WRITERS; i++) {
        pthread_join(writers[i], NULL);
    }
    for (int i = 0; i < NUM_READERS; i++) {
        pthread_join(readers[i], NULL);
    }

    clock_gettime(CLOCK_MONOTONIC, &end);

    double elapsed = (end.tv_sec - start.tv_sec) +
                     (end.tv_nsec - start.tv_nsec) / 1e9;
    printf("elapsed time with rwlock: %.4f seconds\n", elapsed);

    cache_destroy(&cache);
    return 0;
}

פתרון לתרגיל 4 - חישוב מקבילי עם barrier

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define ARRAY_SIZE 10000
#define NUM_THREADS 4
#define NUM_PHASES 100

double array_a[ARRAY_SIZE];
double array_b[ARRAY_SIZE];
pthread_barrier_t barrier;

typedef struct {
    int id;
    int start;
    int end;
} worker_arg_t;

void *worker(void *arg) {
    worker_arg_t *wa = (worker_arg_t *)arg;

    double *src = array_a;
    double *dst = array_b;

    for (int phase = 0; phase < NUM_PHASES; phase++) {
        for (int i = wa->start; i < wa->end; i++) {
            double left  = (i > 0) ? src[i - 1] : src[i];
            double right = (i < ARRAY_SIZE - 1) ? src[i + 1] : src[i];
            dst[i] = (left + src[i] + right) / 3.0;
        }

        // חכה שכולם יסיימו לפני שמחליפים מערכים
        pthread_barrier_wait(&barrier);

        // החלף src ו-dst
        double *tmp = src;
        src = dst;
        dst = tmp;
    }

    return NULL;
}

int main(void) {
    // אתחול - מספרים אקראיים
    srand(42);
    for (int i = 0; i < ARRAY_SIZE; i++) {
        array_a[i] = (double)(rand() % 1000);
        array_b[i] = 0.0;
    }

    pthread_barrier_init(&barrier, NULL, NUM_THREADS);

    pthread_t threads[NUM_THREADS];
    worker_arg_t args[NUM_THREADS];
    int chunk = ARRAY_SIZE / NUM_THREADS;

    for (int i = 0; i < NUM_THREADS; i++) {
        args[i].id = i;
        args[i].start = i * chunk;
        args[i].end = (i == NUM_THREADS - 1) ? ARRAY_SIZE : (i + 1) * chunk;
        pthread_create(&threads[i], NULL, worker, &args[i]);
    }

    for (int i = 0; i < NUM_THREADS; i++) {
        pthread_join(threads[i], NULL);
    }

    pthread_barrier_destroy(&barrier);

    // הדפס כמה ערכים ראשונים
    printf("first 10 values after smoothing:\n");
    // אחרי NUM_PHASES איטרציות, התוצאה תהיה ב-array_a (אם NUM_PHASES זוגי)
    double *result = (NUM_PHASES % 2 == 0) ? array_a : array_b;
    for (int i = 0; i < 10; i++) {
        printf("  [%d] = %.2f\n", i, result[i]);
    }

    return 0;
}

פתרון לתרגיל 5 - מוניטור שומר סף

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

typedef struct {
    int remaining;   // כמה עוד יכולים לעבור
    pthread_mutex_t mutex;
    pthread_cond_t can_pass;
} gate_monitor_t;

void gate_init(gate_monitor_t *g) {
    g->remaining = 0;
    pthread_mutex_init(&g->mutex, NULL);
    pthread_cond_init(&g->can_pass, NULL);
}

void gate_destroy(gate_monitor_t *g) {
    pthread_mutex_destroy(&g->mutex);
    pthread_cond_destroy(&g->can_pass);
}

// חוסם עד שהשער פתוח, ואז עובר
void gate_pass(gate_monitor_t *g, int thread_id) {
    pthread_mutex_lock(&g->mutex);

    while (g->remaining == 0) {
        pthread_cond_wait(&g->can_pass, &g->mutex);
    }

    g->remaining--;
    printf("thread %d passed the gate (remaining: %d)\n", thread_id, g->remaining);

    pthread_mutex_unlock(&g->mutex);
}

// פותח את השער ל-n תהליכונים
void gate_open(gate_monitor_t *g, int n) {
    pthread_mutex_lock(&g->mutex);
    g->remaining += n;
    printf("gate opened for %d threads\n", n);
    pthread_cond_broadcast(&g->can_pass);
    pthread_mutex_unlock(&g->mutex);
}

gate_monitor_t gate;

void *thread_func(void *arg) {
    int id = *(int *)arg;
    printf("thread %d: waiting at the gate...\n", id);
    gate_pass(&gate, id);
    printf("thread %d: through!\n", id);
    return NULL;
}

int main(void) {
    gate_init(&gate);

    pthread_t threads[10];
    int ids[10];

    // צור 10 תהליכונים שמחכים בשער
    for (int i = 0; i < 10; i++) {
        ids[i] = i;
        pthread_create(&threads[i], NULL, thread_func, &ids[i]);
    }

    sleep(1);  // תן לכל התהליכונים להגיע לשער

    // שחרר 3
    gate_open(&gate, 3);
    sleep(1);

    // שחרר עוד 3
    gate_open(&gate, 3);
    sleep(1);

    // שחרר את האחרונים
    gate_open(&gate, 4);

    for (int i = 0; i < 10; i++) {
        pthread_join(threads[i], NULL);
    }

    gate_destroy(&gate);
    printf("done.\n");
    return 0;
}