לדלג לתוכן

12.4 תכנות אסינכרוני עם eventloop הרצאה

הקדמה

איך Node.js מטפל באלפי חיבורים עם תהליכון אחד? איך Redis, אחד ממסדי הנתונים המהירים בעולם, עובד בתהליכון יחיד? איך Nginx מגיש אלפי בקשות בו-זמנית?

התשובה: לולאת אירועים - event loop.

בפרק 11 למדנו על epoll שנותן לנו לנטר הרבה file descriptors. בפרק 12.3 ראינו את io_uring שמבצע I/O אסינכרוני. עכשיו נלמד את הדפוס שמחבר את הכל: ה-event loop.


הרעיון

לולאת אירועים היא הפשטות עצמה:

while (true) {
    events = wait_for_events();    // epoll_wait / io_uring_wait
    for each event:
        handle(event);             // קרא callback מתאים
}

זה הכל. אין threads מרובים, אין mutex, אין deadlock. תהליכון אחד שמחכה לאירועים ומטפל בהם אחד אחרי השני.

הכלל הזהב

שום דבר לא חוסם. כל handler חייב לסיים מהר. אסור:
- sleep() - תקפיא את כל השרת.
- קריאת I/O חוסמת (blocking read) - כל הלקוחות יחכו.
- חישוב כבד - כל הלקוחות יחכו.

אם צריך לעשות משהו שלוקח זמן - תעביר את זה לthread pool נפרד (כמו שלמדנו ב-12.1) ותקבל את התוצאה כאירוע.


מימוש event loop בסיסי

בואו נבנה event loop פשוט אבל שלם מעל epoll:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <sys/signalfd.h>
#include <signal.h>
#include <errno.h>
#include <stdbool.h>

#define MAX_EVENTS 64
#define MAX_FDS 1024

// סוגי callback-ים
typedef void (*event_callback_t)(int fd, void *data);

// רישום אירוע
typedef struct {
    int fd;
    event_callback_t on_read;
    event_callback_t on_write;
    void *data;
    bool active;
} event_source_t;

// לולאת האירועים
typedef struct {
    int epoll_fd;
    event_source_t sources[MAX_FDS];
    bool running;
} event_loop_t;

event_loop_t *event_loop_create(void) {
    event_loop_t *loop = calloc(1, sizeof(event_loop_t));
    loop->epoll_fd = epoll_create1(0);
    loop->running = true;
    return loop;
}

void event_loop_destroy(event_loop_t *loop) {
    close(loop->epoll_fd);
    free(loop);
}

// רשום fd לקריאה
void event_loop_add_reader(event_loop_t *loop, int fd,
                            event_callback_t callback, void *data) {
    struct epoll_event ev = {
        .events = EPOLLIN,
        .data.fd = fd
    };
    epoll_ctl(loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev);

    loop->sources[fd].fd = fd;
    loop->sources[fd].on_read = callback;
    loop->sources[fd].data = data;
    loop->sources[fd].active = true;
}

// רשום fd לכתיבה
void event_loop_add_writer(event_loop_t *loop, int fd,
                            event_callback_t callback, void *data) {
    struct epoll_event ev = {
        .events = EPOLLOUT,
        .data.fd = fd
    };
    epoll_ctl(loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev);

    loop->sources[fd].fd = fd;
    loop->sources[fd].on_write = callback;
    loop->sources[fd].data = data;
    loop->sources[fd].active = true;
}

// הסר fd
void event_loop_remove(event_loop_t *loop, int fd) {
    epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
    loop->sources[fd].active = false;
}

// עצור את הלולאה
void event_loop_stop(event_loop_t *loop) {
    loop->running = false;
}

// הלולאה עצמה
void event_loop_run(event_loop_t *loop) {
    struct epoll_event events[MAX_EVENTS];

    while (loop->running) {
        int n = epoll_wait(loop->epoll_fd, events, MAX_EVENTS, -1);
        if (n < 0) {
            if (errno == EINTR) continue;
            perror("epoll_wait");
            break;
        }

        for (int i = 0; i < n; i++) {
            int fd = events[i].data.fd;
            event_source_t *src = &loop->sources[fd];

            if (!src->active) continue;

            if ((events[i].events & EPOLLIN) && src->on_read) {
                src->on_read(fd, src->data);
            }
            if ((events[i].events & EPOLLOUT) && src->on_write) {
                src->on_write(fd, src->data);
            }
        }
    }
}

שימוש

// handler לקריאה
void on_stdin_read(int fd, void *data) {
    (void)data;
    char buf[256];
    ssize_t n = read(fd, buf, sizeof(buf) - 1);
    if (n > 0) {
        buf[n] = '\0';
        printf("got: %s", buf);
    }
}

int main(void) {
    event_loop_t *loop = event_loop_create();

    // האזן לstdin
    event_loop_add_reader(loop, STDIN_FILENO, on_stdin_read, NULL);

    printf("type something (Ctrl+C to quit):\n");
    event_loop_run(loop);

    event_loop_destroy(loop);
    return 0;
}

אירועי טיימר - timerfd

לינוקס מספקת timerfd - fd שמתנהג כמו טיימר. כשהטיימר פוקע, הfd הופך ל"מוכן לקריאה", ואפשר לשלב אותו ב-event loop:

#include <sys/timerfd.h>

// צור טיימר שפוקע כל שנייה
int create_timer(int interval_sec) {
    int tfd = timerfd_create(CLOCK_MONOTONIC, 0);

    struct itimerspec spec = {
        .it_interval = { .tv_sec = interval_sec, .tv_nsec = 0 },  // חוזר
        .it_value    = { .tv_sec = interval_sec, .tv_nsec = 0 }   // ראשון
    };
    timerfd_settime(tfd, 0, &spec, NULL);

    return tfd;
}

// handler לטיימר
void on_timer(int fd, void *data) {
    uint64_t expirations;
    read(fd, &expirations, sizeof(expirations));  // חובה לקרוא
    printf("timer fired! (%lu expirations)\n", expirations);
}

// שימוש ב-event loop
int tfd = create_timer(1);  // כל שנייה
event_loop_add_reader(loop, tfd, on_timer, NULL);

אירועי סיגנלים - signalfd

בדומה ל-timerfd, אפשר להפוך סיגנלים לfd-ים עם signalfd:

#include <sys/signalfd.h>
#include <signal.h>

int create_signal_fd(void) {
    sigset_t mask;
    sigemptyset(&mask);
    sigaddset(&mask, SIGINT);
    sigaddset(&mask, SIGTERM);

    // חסום את הסיגנלים כדי שsignalfd יקבל אותם
    sigprocmask(SIG_BLOCK, &mask, NULL);

    return signalfd(-1, &mask, 0);
}

void on_signal(int fd, void *data) {
    event_loop_t *loop = (event_loop_t *)data;
    struct signalfd_siginfo info;
    read(fd, &info, sizeof(info));

    printf("got signal %d\n", info.ssi_signo);
    event_loop_stop(loop);
}

// שימוש
int sfd = create_signal_fd();
event_loop_add_reader(loop, sfd, on_signal, loop);

עכשיו Ctrl+C לא הורג את התוכנית ישר, אלא עובר דרך event loop ומאפשר כיבוי מסודר (graceful shutdown).


דוגמה שלמה: event loop עם טיימר, סיגנלים, ו-stdin

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <sys/signalfd.h>
#include <signal.h>
#include <stdbool.h>
#include <errno.h>
#include <stdint.h>

// (הגדרת event_loop_t וכל הפונקציות מלמעלה)

int tick_count = 0;

void on_timer(int fd, void *data) {
    uint64_t exp;
    read(fd, &exp, sizeof(exp));
    tick_count++;
    printf("[tick %d]\n", tick_count);
}

void on_stdin(int fd, void *data) {
    char buf[256];
    ssize_t n = read(fd, buf, sizeof(buf) - 1);
    if (n > 0) {
        buf[n] = '\0';
        // הסר newline
        if (buf[n-1] == '\n') buf[n-1] = '\0';
        printf("you typed: '%s'\n", buf);

        if (strcmp(buf, "quit") == 0) {
            event_loop_t *loop = (event_loop_t *)data;
            event_loop_stop(loop);
        }
    }
}

void on_signal(int fd, void *data) {
    struct signalfd_siginfo info;
    read(fd, &info, sizeof(info));
    printf("\ncaught signal %d, shutting down...\n", info.ssi_signo);
    event_loop_t *loop = (event_loop_t *)data;
    event_loop_stop(loop);
}

int main(void) {
    event_loop_t *loop = event_loop_create();

    // טיימר - כל שנייה
    int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
    struct itimerspec ts = {
        .it_interval = { .tv_sec = 1 },
        .it_value = { .tv_sec = 1 }
    };
    timerfd_settime(tfd, 0, &ts, NULL);
    event_loop_add_reader(loop, tfd, on_timer, NULL);

    // stdin
    event_loop_add_reader(loop, STDIN_FILENO, on_stdin, loop);

    // סיגנלים
    sigset_t mask;
    sigemptyset(&mask);
    sigaddset(&mask, SIGINT);
    sigprocmask(SIG_BLOCK, &mask, NULL);
    int sfd = signalfd(-1, &mask, 0);
    event_loop_add_reader(loop, sfd, on_signal, loop);

    printf("event loop running. type 'quit' or Ctrl+C to exit.\n");
    event_loop_run(loop);

    close(tfd);
    close(sfd);
    event_loop_destroy(loop);
    printf("bye.\n");
    return 0;
}

קולבקים מול מכונת מצבים - callbacks vs state machines

בעיית הcallbacks

כשיש פרוטוקול מורכב (למשל HTTP), הלוגיקה מתפצלת בין הרבה callbacks, וקשה לעקוב אחרי הזרימה. זה מה שמכנים "callback hell":

// קשה לעקוב: הלוגיקה מפוזרת
void on_request_read(int fd, void *data) {
    // קרא בקשה...
    // כשסיים, רשום callback לעיבוד
}
void on_request_processed(int fd, void *data) {
    // עבד...
    // כשסיים, רשום callback לכתיבה
}
void on_response_written(int fd, void *data) {
    // נכתב...
    // חזור לקרוא בקשה חדשה
}

מכונת מצבים - state machine

גישה חלופית: כל חיבור מחזיק מצב (state), וhandler אחד מטפל בכל המצבים:

typedef enum {
    STATE_READING_REQUEST,
    STATE_PROCESSING,
    STATE_WRITING_RESPONSE,
} conn_state_t;

typedef struct {
    int fd;
    conn_state_t state;
    char read_buf[4096];
    int read_pos;
    char write_buf[4096];
    int write_pos;
    int write_len;
} connection_t;

void handle_connection(int fd, void *data) {
    connection_t *conn = (connection_t *)data;

    switch (conn->state) {
    case STATE_READING_REQUEST: {
        ssize_t n = read(fd, conn->read_buf + conn->read_pos,
                         sizeof(conn->read_buf) - conn->read_pos);
        if (n <= 0) { /* סגור */ return; }
        conn->read_pos += n;

        // בדוק אם קיבלנו בקשה שלמה
        if (strstr(conn->read_buf, "\r\n\r\n")) {
            conn->state = STATE_PROCESSING;
            // עבד את הבקשה (ישר, כי זה מהיר)
            process_request(conn);
            conn->state = STATE_WRITING_RESPONSE;
        }
        break;
    }

    case STATE_WRITING_RESPONSE: {
        ssize_t n = write(fd, conn->write_buf + conn->write_pos,
                          conn->write_len - conn->write_pos);
        if (n <= 0) { /* שגיאה */ return; }
        conn->write_pos += n;

        if (conn->write_pos >= conn->write_len) {
            // סיימנו לכתוב, חזור לקרוא
            conn->state = STATE_READING_REQUEST;
            conn->read_pos = 0;
            conn->write_pos = 0;
        }
        break;
    }

    default:
        break;
    }
}

מכונת מצבים יותר ברורה כשיש פרוטוקול עם שלבים מוגדרים.


קורוטינות ותכנות אסינכרוני - coroutines and async/await

הבעיה

גם callbacks וגם מכונות מצבים מסרבלים: הלוגיקה שבקוד סינכרוני (רגיל) היא שורה אחרי שורה, אבל בקוד אסינכרוני היא מפוצלת.

קוד סינכרוני (פשוט, אבל חוסם):

request = read_full_request(fd);
response = process(request);
write_full_response(fd, response);

קוד אסינכרוני (לא חוסם, אבל מסורבל):

// מפוצל בין 3 callbacks שונים, קשה לעקוב

הפתרון: קורוטינות - coroutines

קורוטינה היא פונקציה שיכולה "להשהות" את עצמה ולחזור מאוחר יותר מאותה נקודה. זה מאפשר לכתוב קוד שנראה סינכרוני אבל עובד אסינכרוני:

// פסאודו-קוד עם coroutines
async function handle_client(fd):
    request = await read(fd)       // "השהה" עד שיש נתונים
    response = process(request)
    await write(fd, response)      // "השהה" עד שהכתיבה הושלמה

בC אין תמיכה מובנית בcoroutines (בניגוד ל-Go, Python, Rust, C#). אבל אפשר לממש אותן עם:
- setjmp/longjmp - שמירה ושחזור של הקונטקסט.
- ucontext - API יותר נוח (הוצא משימוש אבל עדיין עובד בלינוקס).
- תוספות קומפיילר (GCC coroutines).
- ספריות כמו libco, minicoro.

העניין הוא שהקונספט הזה הוא הבסיס לasync/await בכל השפות המודרניות:
- גורוטינות ב-Go - הruntime של Go מנהל אלפי goroutines מעל מספר קטן של threads.
- async/await ב-Python (asyncio) - coroutines שרצות על event loop.
- async/await ב-Rust - coroutines שמתקמפלות למכונת מצבים.
- async/await ב-JavaScript (Node.js) - coroutines מעל libuv event loop.


שרת HTTP פשוט עם event loop

בואו נבנה שרת HTTP מינימלי שמשתמש ב-event loop שלנו:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdbool.h>

#define PORT 8080
#define MAX_EVENTS 64
#define BUF_SIZE 4096

typedef struct {
    int fd;
    char read_buf[BUF_SIZE];
    int read_len;
    char write_buf[BUF_SIZE];
    int write_len;
    int write_pos;
    bool writing;
} http_conn_t;

int epfd;

// הפוך fd ל-non-blocking
void set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

// בנה תשובת HTTP פשוטה
void build_response(http_conn_t *conn) {
    const char *body = "<html><body><h1>Hello from C event loop!</h1></body></html>";
    conn->write_len = snprintf(conn->write_buf, BUF_SIZE,
        "HTTP/1.1 200 OK\r\n"
        "Content-Type: text/html\r\n"
        "Content-Length: %zu\r\n"
        "Connection: close\r\n"
        "\r\n"
        "%s",
        strlen(body), body);
    conn->write_pos = 0;
    conn->writing = true;
}

// טפל בקריאה
void handle_read(http_conn_t *conn) {
    ssize_t n = read(conn->fd, conn->read_buf + conn->read_len,
                     BUF_SIZE - conn->read_len - 1);

    if (n <= 0) {
        if (n == 0 || errno != EAGAIN) {
            close(conn->fd);
            free(conn);
        }
        return;
    }

    conn->read_len += n;
    conn->read_buf[conn->read_len] = '\0';

    // בדוק אם קיבלנו את כל הheaders
    if (strstr(conn->read_buf, "\r\n\r\n")) {
        // בנה תשובה
        build_response(conn);

        // עבור למצב כתיבה
        struct epoll_event ev = {
            .events = EPOLLOUT,
            .data.ptr = conn
        };
        epoll_ctl(epfd, EPOLL_CTL_MOD, conn->fd, &ev);
    }
}

// טפל בכתיבה
void handle_write(http_conn_t *conn) {
    ssize_t n = write(conn->fd, conn->write_buf + conn->write_pos,
                      conn->write_len - conn->write_pos);

    if (n <= 0) {
        if (errno != EAGAIN) {
            close(conn->fd);
            free(conn);
        }
        return;
    }

    conn->write_pos += n;

    if (conn->write_pos >= conn->write_len) {
        // סיימנו - סגור את החיבור
        close(conn->fd);
        free(conn);
    }
}

int main(void) {
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    int opt = 1;
    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    set_nonblocking(server_fd);

    struct sockaddr_in addr = {
        .sin_family = AF_INET,
        .sin_port = htons(PORT),
        .sin_addr.s_addr = INADDR_ANY
    };
    bind(server_fd, (struct sockaddr *)&addr, sizeof(addr));
    listen(server_fd, 128);

    epfd = epoll_create1(0);
    struct epoll_event ev = {
        .events = EPOLLIN,
        .data.fd = server_fd
    };
    epoll_ctl(epfd, EPOLL_CTL_ADD, server_fd, &ev);

    printf("HTTP server listening on port %d\n", PORT);

    struct epoll_event events[MAX_EVENTS];
    while (true) {
        int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
        if (n < 0) {
            if (errno == EINTR) continue;
            break;
        }

        for (int i = 0; i < n; i++) {
            if (events[i].data.fd == server_fd) {
                // חיבור חדש
                struct sockaddr_in client_addr;
                socklen_t len = sizeof(client_addr);
                int client_fd = accept(server_fd,
                    (struct sockaddr *)&client_addr, &len);

                if (client_fd < 0) continue;
                set_nonblocking(client_fd);

                http_conn_t *conn = calloc(1, sizeof(http_conn_t));
                conn->fd = client_fd;

                struct epoll_event cev = {
                    .events = EPOLLIN,
                    .data.ptr = conn
                };
                epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &cev);
            } else {
                http_conn_t *conn = events[i].data.ptr;
                if (events[i].events & EPOLLIN) {
                    handle_read(conn);
                } else if (events[i].events & EPOLLOUT) {
                    handle_write(conn);
                }
            }
        }
    }

    close(server_fd);
    close(epfd);
    return 0;
}

קומפילציה ובדיקה:

gcc -o httpserver httpserver.c
./httpserver

# בטרמינל אחר, או בדפדפן:
curl http://localhost:8080

שימו לב: שרת אחד, תהליכון אחד, אפס mutex-ים, ומטפל בהרבה לקוחות במקביל. זו העוצמה של event loop.


סיכום

גישה תהליכונים מורכבות ביצועים שימושים
תהליכון לכל חיבור רבים נמוכה בינוניים שרתים פשוטים
בריכת תהליכונים (thread pool) N קבועים בינונית טובים שרתי וב, מסדי נתונים
לולאת אירועים (event loop) 1 גבוהה מעולים שרתים עתירי I/O
היברידי (event loop + thread pool) 1 + N גבוהה הכי טובים שרתים מודרניים (Nginx, Node.js)

הגישה המודרנית ברוב המערכות היא היברידית: event loop ראשי שמטפל ב-I/O, ו-thread pool לעבודות חישוב כבדות. ככה מקבלים את הטוב משני העולמות.