12.1 דפוסי תכנות מקבילי פתרון
פתרון לתרגיל 1 - יצרן-צרכן בסיסי
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define QUEUE_CAPACITY 8
#define NUM_PRODUCERS 3
#define NUM_CONSUMERS 2
#define ITEMS_PER_PRODUCER 20
#define POISON_PILL -1
typedef struct {
int items[QUEUE_CAPACITY];
int head, tail, count;
pthread_mutex_t mutex;
pthread_cond_t not_full;
pthread_cond_t not_empty;
} bounded_queue_t;
void queue_init(bounded_queue_t *q) {
q->head = q->tail = q->count = 0;
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->not_full, NULL);
pthread_cond_init(&q->not_empty, NULL);
}
void queue_push(bounded_queue_t *q, int item) {
pthread_mutex_lock(&q->mutex);
while (q->count == QUEUE_CAPACITY) {
pthread_cond_wait(&q->not_full, &q->mutex);
}
q->items[q->tail] = item;
q->tail = (q->tail + 1) % QUEUE_CAPACITY;
q->count++;
pthread_cond_signal(&q->not_empty);
pthread_mutex_unlock(&q->mutex);
}
int queue_pop(bounded_queue_t *q) {
pthread_mutex_lock(&q->mutex);
while (q->count == 0) {
pthread_cond_wait(&q->not_empty, &q->mutex);
}
int item = q->items[q->head];
q->head = (q->head + 1) % QUEUE_CAPACITY;
q->count--;
pthread_cond_signal(&q->not_full);
pthread_mutex_unlock(&q->mutex);
return item;
}
void queue_destroy(bounded_queue_t *q) {
pthread_mutex_destroy(&q->mutex);
pthread_cond_destroy(&q->not_full);
pthread_cond_destroy(&q->not_empty);
}
bounded_queue_t queue;
void *producer(void *arg) {
int id = *(int *)arg;
for (int i = 0; i < ITEMS_PER_PRODUCER; i++) {
int item = id * 1000 + i;
queue_push(&queue, item);
printf("producer %d: pushed %d\n", id, item);
}
return NULL;
}
void *consumer(void *arg) {
int id = *(int *)arg;
long sum = 0;
int count = 0;
while (1) {
int item = queue_pop(&queue);
if (item == POISON_PILL) {
break;
}
sum += item;
count++;
}
printf("consumer %d: consumed %d items, sum = %ld\n", id, count, sum);
return NULL;
}
int main(void) {
queue_init(&queue);
pthread_t producers[NUM_PRODUCERS];
pthread_t consumers[NUM_CONSUMERS];
int producer_ids[NUM_PRODUCERS];
int consumer_ids[NUM_CONSUMERS];
for (int i = 0; i < NUM_PRODUCERS; i++) {
producer_ids[i] = i;
pthread_create(&producers[i], NULL, producer, &producer_ids[i]);
}
for (int i = 0; i < NUM_CONSUMERS; i++) {
consumer_ids[i] = i;
pthread_create(&consumers[i], NULL, consumer, &consumer_ids[i]);
}
// חכה שכל היצרנים יסיימו
for (int i = 0; i < NUM_PRODUCERS; i++) {
pthread_join(producers[i], NULL);
}
// שלח poison pill לכל צרכן
for (int i = 0; i < NUM_CONSUMERS; i++) {
queue_push(&queue, POISON_PILL);
}
// חכה שכל הצרכנים יסיימו
for (int i = 0; i < NUM_CONSUMERS; i++) {
pthread_join(consumers[i], NULL);
}
queue_destroy(&queue);
printf("done.\n");
return 0;
}
פתרון לתרגיל 2 - בריכת תהליכונים
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdbool.h>
#include <unistd.h>
typedef struct task {
void (*function)(void *arg);
void *arg;
struct task *next;
} task_t;
typedef struct {
pthread_t *threads;
int num_threads;
task_t *task_head;
task_t *task_tail;
int task_count;
pthread_mutex_t mutex;
pthread_cond_t task_available;
bool shutdown;
} threadpool_t;
static void *worker_func(void *arg) {
threadpool_t *pool = (threadpool_t *)arg;
while (true) {
pthread_mutex_lock(&pool->mutex);
while (pool->task_count == 0 && !pool->shutdown) {
pthread_cond_wait(&pool->task_available, &pool->mutex);
}
if (pool->shutdown && pool->task_count == 0) {
pthread_mutex_unlock(&pool->mutex);
return NULL;
}
task_t *task = pool->task_head;
pool->task_head = task->next;
if (pool->task_head == NULL) {
pool->task_tail = NULL;
}
pool->task_count--;
pthread_mutex_unlock(&pool->mutex);
task->function(task->arg);
free(task);
}
return NULL;
}
threadpool_t *threadpool_create(int num_threads) {
threadpool_t *pool = calloc(1, sizeof(threadpool_t));
pool->num_threads = num_threads;
pool->threads = malloc(sizeof(pthread_t) * num_threads);
pool->shutdown = false;
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->task_available, NULL);
for (int i = 0; i < num_threads; i++) {
pthread_create(&pool->threads[i], NULL, worker_func, pool);
}
return pool;
}
void threadpool_submit(threadpool_t *pool, void (*func)(void *), void *arg) {
task_t *task = malloc(sizeof(task_t));
task->function = func;
task->arg = arg;
task->next = NULL;
pthread_mutex_lock(&pool->mutex);
if (pool->task_tail) {
pool->task_tail->next = task;
} else {
pool->task_head = task;
}
pool->task_tail = task;
pool->task_count++;
pthread_cond_signal(&pool->task_available);
pthread_mutex_unlock(&pool->mutex);
}
void threadpool_destroy(threadpool_t *pool) {
pthread_mutex_lock(&pool->mutex);
pool->shutdown = true;
pthread_cond_broadcast(&pool->task_available);
pthread_mutex_unlock(&pool->mutex);
for (int i = 0; i < pool->num_threads; i++) {
pthread_join(pool->threads[i], NULL);
}
free(pool->threads);
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->task_available);
free(pool);
}
void square_task(void *arg) {
int *num = (int *)arg;
int result = (*num) * (*num);
printf("thread %lu: %d^2 = %d\n", (unsigned long)pthread_self(), *num, result);
free(num);
}
int main(void) {
threadpool_t *pool = threadpool_create(4);
for (int i = 0; i < 100; i++) {
int *num = malloc(sizeof(int));
*num = i;
threadpool_submit(pool, square_task, num);
}
sleep(2);
threadpool_destroy(pool);
printf("done.\n");
return 0;
}
פתרון לתרגיל 3 - מטמון עם reader-writer lock
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#define CACHE_SIZE 256
#define NUM_READERS 8
#define NUM_WRITERS 2
#define READS_PER_THREAD 1000
#define WRITES_PER_THREAD 100
typedef struct {
char key[32];
char value[64];
int used;
} cache_entry_t;
typedef struct {
cache_entry_t entries[CACHE_SIZE];
int count;
pthread_rwlock_t rwlock;
} cache_t;
void cache_init(cache_t *c) {
memset(c->entries, 0, sizeof(c->entries));
c->count = 0;
pthread_rwlock_init(&c->rwlock, NULL);
}
int cache_get(cache_t *c, const char *key, char *value_out) {
pthread_rwlock_rdlock(&c->rwlock);
for (int i = 0; i < c->count; i++) {
if (c->entries[i].used && strcmp(c->entries[i].key, key) == 0) {
strcpy(value_out, c->entries[i].value);
pthread_rwlock_unlock(&c->rwlock);
return 1;
}
}
pthread_rwlock_unlock(&c->rwlock);
return 0;
}
void cache_set(cache_t *c, const char *key, const char *value) {
pthread_rwlock_wrlock(&c->rwlock);
// בדוק אם קיים
for (int i = 0; i < c->count; i++) {
if (c->entries[i].used && strcmp(c->entries[i].key, key) == 0) {
strncpy(c->entries[i].value, value, 63);
c->entries[i].value[63] = '\0';
pthread_rwlock_unlock(&c->rwlock);
return;
}
}
// הוסף חדש
if (c->count < CACHE_SIZE) {
strncpy(c->entries[c->count].key, key, 31);
c->entries[c->count].key[31] = '\0';
strncpy(c->entries[c->count].value, value, 63);
c->entries[c->count].value[63] = '\0';
c->entries[c->count].used = 1;
c->count++;
}
pthread_rwlock_unlock(&c->rwlock);
}
void cache_destroy(cache_t *c) {
pthread_rwlock_destroy(&c->rwlock);
}
cache_t cache;
void *reader_func(void *arg) {
int id = *(int *)arg;
int hits = 0;
char value[64];
for (int i = 0; i < READS_PER_THREAD; i++) {
char key[32];
snprintf(key, sizeof(key), "key_%d", rand() % (WRITES_PER_THREAD * NUM_WRITERS));
if (cache_get(&cache, key, value)) {
hits++;
}
}
printf("reader %d: %d hits out of %d lookups\n", id, hits, READS_PER_THREAD);
return NULL;
}
void *writer_func(void *arg) {
int id = *(int *)arg;
for (int i = 0; i < WRITES_PER_THREAD; i++) {
char key[32], value[64];
snprintf(key, sizeof(key), "key_%d", id * WRITES_PER_THREAD + i);
snprintf(value, sizeof(value), "value_from_writer_%d_%d", id, i);
cache_set(&cache, key, value);
}
printf("writer %d: wrote %d entries\n", id, WRITES_PER_THREAD);
return NULL;
}
int main(void) {
srand(time(NULL));
cache_init(&cache);
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);
pthread_t readers[NUM_READERS], writers[NUM_WRITERS];
int reader_ids[NUM_READERS], writer_ids[NUM_WRITERS];
for (int i = 0; i < NUM_WRITERS; i++) {
writer_ids[i] = i;
pthread_create(&writers[i], NULL, writer_func, &writer_ids[i]);
}
for (int i = 0; i < NUM_READERS; i++) {
reader_ids[i] = i;
pthread_create(&readers[i], NULL, reader_func, &reader_ids[i]);
}
for (int i = 0; i < NUM_WRITERS; i++) {
pthread_join(writers[i], NULL);
}
for (int i = 0; i < NUM_READERS; i++) {
pthread_join(readers[i], NULL);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double elapsed = (end.tv_sec - start.tv_sec) +
(end.tv_nsec - start.tv_nsec) / 1e9;
printf("elapsed time with rwlock: %.4f seconds\n", elapsed);
cache_destroy(&cache);
return 0;
}
פתרון לתרגיל 4 - חישוב מקבילי עם barrier
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define ARRAY_SIZE 10000
#define NUM_THREADS 4
#define NUM_PHASES 100
double array_a[ARRAY_SIZE];
double array_b[ARRAY_SIZE];
pthread_barrier_t barrier;
typedef struct {
int id;
int start;
int end;
} worker_arg_t;
void *worker(void *arg) {
worker_arg_t *wa = (worker_arg_t *)arg;
double *src = array_a;
double *dst = array_b;
for (int phase = 0; phase < NUM_PHASES; phase++) {
for (int i = wa->start; i < wa->end; i++) {
double left = (i > 0) ? src[i - 1] : src[i];
double right = (i < ARRAY_SIZE - 1) ? src[i + 1] : src[i];
dst[i] = (left + src[i] + right) / 3.0;
}
// חכה שכולם יסיימו לפני שמחליפים מערכים
pthread_barrier_wait(&barrier);
// החלף src ו-dst
double *tmp = src;
src = dst;
dst = tmp;
}
return NULL;
}
int main(void) {
// אתחול - מספרים אקראיים
srand(42);
for (int i = 0; i < ARRAY_SIZE; i++) {
array_a[i] = (double)(rand() % 1000);
array_b[i] = 0.0;
}
pthread_barrier_init(&barrier, NULL, NUM_THREADS);
pthread_t threads[NUM_THREADS];
worker_arg_t args[NUM_THREADS];
int chunk = ARRAY_SIZE / NUM_THREADS;
for (int i = 0; i < NUM_THREADS; i++) {
args[i].id = i;
args[i].start = i * chunk;
args[i].end = (i == NUM_THREADS - 1) ? ARRAY_SIZE : (i + 1) * chunk;
pthread_create(&threads[i], NULL, worker, &args[i]);
}
for (int i = 0; i < NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
pthread_barrier_destroy(&barrier);
// הדפס כמה ערכים ראשונים
printf("first 10 values after smoothing:\n");
// אחרי NUM_PHASES איטרציות, התוצאה תהיה ב-array_a (אם NUM_PHASES זוגי)
double *result = (NUM_PHASES % 2 == 0) ? array_a : array_b;
for (int i = 0; i < 10; i++) {
printf(" [%d] = %.2f\n", i, result[i]);
}
return 0;
}
פתרון לתרגיל 5 - מוניטור שומר סף
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
typedef struct {
int remaining; // כמה עוד יכולים לעבור
pthread_mutex_t mutex;
pthread_cond_t can_pass;
} gate_monitor_t;
void gate_init(gate_monitor_t *g) {
g->remaining = 0;
pthread_mutex_init(&g->mutex, NULL);
pthread_cond_init(&g->can_pass, NULL);
}
void gate_destroy(gate_monitor_t *g) {
pthread_mutex_destroy(&g->mutex);
pthread_cond_destroy(&g->can_pass);
}
// חוסם עד שהשער פתוח, ואז עובר
void gate_pass(gate_monitor_t *g, int thread_id) {
pthread_mutex_lock(&g->mutex);
while (g->remaining == 0) {
pthread_cond_wait(&g->can_pass, &g->mutex);
}
g->remaining--;
printf("thread %d passed the gate (remaining: %d)\n", thread_id, g->remaining);
pthread_mutex_unlock(&g->mutex);
}
// פותח את השער ל-n תהליכונים
void gate_open(gate_monitor_t *g, int n) {
pthread_mutex_lock(&g->mutex);
g->remaining += n;
printf("gate opened for %d threads\n", n);
pthread_cond_broadcast(&g->can_pass);
pthread_mutex_unlock(&g->mutex);
}
gate_monitor_t gate;
void *thread_func(void *arg) {
int id = *(int *)arg;
printf("thread %d: waiting at the gate...\n", id);
gate_pass(&gate, id);
printf("thread %d: through!\n", id);
return NULL;
}
int main(void) {
gate_init(&gate);
pthread_t threads[10];
int ids[10];
// צור 10 תהליכונים שמחכים בשער
for (int i = 0; i < 10; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, thread_func, &ids[i]);
}
sleep(1); // תן לכל התהליכונים להגיע לשער
// שחרר 3
gate_open(&gate, 3);
sleep(1);
// שחרר עוד 3
gate_open(&gate, 3);
sleep(1);
// שחרר את האחרונים
gate_open(&gate, 4);
for (int i = 0; i < 10; i++) {
pthread_join(threads[i], NULL);
}
gate_destroy(&gate);
printf("done.\n");
return 0;
}