12.4 תכנות אסינכרוני עם eventloop הרצאה
הקדמה¶
איך Node.js מטפל באלפי חיבורים עם תהליכון אחד? איך Redis, אחד ממסדי הנתונים המהירים בעולם, עובד בתהליכון יחיד? איך Nginx מגיש אלפי בקשות בו-זמנית?
התשובה: לולאת אירועים - event loop.
בפרק 11 למדנו על epoll שנותן לנו לנטר הרבה file descriptors. בפרק 12.3 ראינו את io_uring שמבצע I/O אסינכרוני. עכשיו נלמד את הדפוס שמחבר את הכל: ה-event loop.
הרעיון¶
לולאת אירועים היא הפשטות עצמה:
while (true) {
events = wait_for_events(); // epoll_wait / io_uring_wait
for each event:
handle(event); // קרא callback מתאים
}
זה הכל. אין threads מרובים, אין mutex, אין deadlock. תהליכון אחד שמחכה לאירועים ומטפל בהם אחד אחרי השני.
הכלל הזהב¶
שום דבר לא חוסם. כל handler חייב לסיים מהר. אסור:
- sleep() - תקפיא את כל השרת.
- קריאת I/O חוסמת (blocking read) - כל הלקוחות יחכו.
- חישוב כבד - כל הלקוחות יחכו.
אם צריך לעשות משהו שלוקח זמן - תעביר את זה לthread pool נפרד (כמו שלמדנו ב-12.1) ותקבל את התוצאה כאירוע.
מימוש event loop בסיסי¶
בואו נבנה event loop פשוט אבל שלם מעל epoll:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <sys/signalfd.h>
#include <signal.h>
#include <errno.h>
#include <stdbool.h>
#define MAX_EVENTS 64
#define MAX_FDS 1024
// סוגי callback-ים
typedef void (*event_callback_t)(int fd, void *data);
// רישום אירוע
typedef struct {
int fd;
event_callback_t on_read;
event_callback_t on_write;
void *data;
bool active;
} event_source_t;
// לולאת האירועים
typedef struct {
int epoll_fd;
event_source_t sources[MAX_FDS];
bool running;
} event_loop_t;
event_loop_t *event_loop_create(void) {
event_loop_t *loop = calloc(1, sizeof(event_loop_t));
loop->epoll_fd = epoll_create1(0);
loop->running = true;
return loop;
}
void event_loop_destroy(event_loop_t *loop) {
close(loop->epoll_fd);
free(loop);
}
// רשום fd לקריאה
void event_loop_add_reader(event_loop_t *loop, int fd,
event_callback_t callback, void *data) {
struct epoll_event ev = {
.events = EPOLLIN,
.data.fd = fd
};
epoll_ctl(loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
loop->sources[fd].fd = fd;
loop->sources[fd].on_read = callback;
loop->sources[fd].data = data;
loop->sources[fd].active = true;
}
// רשום fd לכתיבה
void event_loop_add_writer(event_loop_t *loop, int fd,
event_callback_t callback, void *data) {
struct epoll_event ev = {
.events = EPOLLOUT,
.data.fd = fd
};
epoll_ctl(loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
loop->sources[fd].fd = fd;
loop->sources[fd].on_write = callback;
loop->sources[fd].data = data;
loop->sources[fd].active = true;
}
// הסר fd
void event_loop_remove(event_loop_t *loop, int fd) {
epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
loop->sources[fd].active = false;
}
// עצור את הלולאה
void event_loop_stop(event_loop_t *loop) {
loop->running = false;
}
// הלולאה עצמה
void event_loop_run(event_loop_t *loop) {
struct epoll_event events[MAX_EVENTS];
while (loop->running) {
int n = epoll_wait(loop->epoll_fd, events, MAX_EVENTS, -1);
if (n < 0) {
if (errno == EINTR) continue;
perror("epoll_wait");
break;
}
for (int i = 0; i < n; i++) {
int fd = events[i].data.fd;
event_source_t *src = &loop->sources[fd];
if (!src->active) continue;
if ((events[i].events & EPOLLIN) && src->on_read) {
src->on_read(fd, src->data);
}
if ((events[i].events & EPOLLOUT) && src->on_write) {
src->on_write(fd, src->data);
}
}
}
}
שימוש¶
// handler לקריאה
void on_stdin_read(int fd, void *data) {
(void)data;
char buf[256];
ssize_t n = read(fd, buf, sizeof(buf) - 1);
if (n > 0) {
buf[n] = '\0';
printf("got: %s", buf);
}
}
int main(void) {
event_loop_t *loop = event_loop_create();
// האזן לstdin
event_loop_add_reader(loop, STDIN_FILENO, on_stdin_read, NULL);
printf("type something (Ctrl+C to quit):\n");
event_loop_run(loop);
event_loop_destroy(loop);
return 0;
}
אירועי טיימר - timerfd¶
לינוקס מספקת timerfd - fd שמתנהג כמו טיימר. כשהטיימר פוקע, הfd הופך ל"מוכן לקריאה", ואפשר לשלב אותו ב-event loop:
#include <sys/timerfd.h>
// צור טיימר שפוקע כל שנייה
int create_timer(int interval_sec) {
int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
struct itimerspec spec = {
.it_interval = { .tv_sec = interval_sec, .tv_nsec = 0 }, // חוזר
.it_value = { .tv_sec = interval_sec, .tv_nsec = 0 } // ראשון
};
timerfd_settime(tfd, 0, &spec, NULL);
return tfd;
}
// handler לטיימר
void on_timer(int fd, void *data) {
uint64_t expirations;
read(fd, &expirations, sizeof(expirations)); // חובה לקרוא
printf("timer fired! (%lu expirations)\n", expirations);
}
// שימוש ב-event loop
int tfd = create_timer(1); // כל שנייה
event_loop_add_reader(loop, tfd, on_timer, NULL);
אירועי סיגנלים - signalfd¶
בדומה ל-timerfd, אפשר להפוך סיגנלים לfd-ים עם signalfd:
#include <sys/signalfd.h>
#include <signal.h>
int create_signal_fd(void) {
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGINT);
sigaddset(&mask, SIGTERM);
// חסום את הסיגנלים כדי שsignalfd יקבל אותם
sigprocmask(SIG_BLOCK, &mask, NULL);
return signalfd(-1, &mask, 0);
}
void on_signal(int fd, void *data) {
event_loop_t *loop = (event_loop_t *)data;
struct signalfd_siginfo info;
read(fd, &info, sizeof(info));
printf("got signal %d\n", info.ssi_signo);
event_loop_stop(loop);
}
// שימוש
int sfd = create_signal_fd();
event_loop_add_reader(loop, sfd, on_signal, loop);
עכשיו Ctrl+C לא הורג את התוכנית ישר, אלא עובר דרך event loop ומאפשר כיבוי מסודר (graceful shutdown).
דוגמה שלמה: event loop עם טיימר, סיגנלים, ו-stdin¶
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <sys/signalfd.h>
#include <signal.h>
#include <stdbool.h>
#include <errno.h>
#include <stdint.h>
// (הגדרת event_loop_t וכל הפונקציות מלמעלה)
int tick_count = 0;
void on_timer(int fd, void *data) {
uint64_t exp;
read(fd, &exp, sizeof(exp));
tick_count++;
printf("[tick %d]\n", tick_count);
}
void on_stdin(int fd, void *data) {
char buf[256];
ssize_t n = read(fd, buf, sizeof(buf) - 1);
if (n > 0) {
buf[n] = '\0';
// הסר newline
if (buf[n-1] == '\n') buf[n-1] = '\0';
printf("you typed: '%s'\n", buf);
if (strcmp(buf, "quit") == 0) {
event_loop_t *loop = (event_loop_t *)data;
event_loop_stop(loop);
}
}
}
void on_signal(int fd, void *data) {
struct signalfd_siginfo info;
read(fd, &info, sizeof(info));
printf("\ncaught signal %d, shutting down...\n", info.ssi_signo);
event_loop_t *loop = (event_loop_t *)data;
event_loop_stop(loop);
}
int main(void) {
event_loop_t *loop = event_loop_create();
// טיימר - כל שנייה
int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
struct itimerspec ts = {
.it_interval = { .tv_sec = 1 },
.it_value = { .tv_sec = 1 }
};
timerfd_settime(tfd, 0, &ts, NULL);
event_loop_add_reader(loop, tfd, on_timer, NULL);
// stdin
event_loop_add_reader(loop, STDIN_FILENO, on_stdin, loop);
// סיגנלים
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGINT);
sigprocmask(SIG_BLOCK, &mask, NULL);
int sfd = signalfd(-1, &mask, 0);
event_loop_add_reader(loop, sfd, on_signal, loop);
printf("event loop running. type 'quit' or Ctrl+C to exit.\n");
event_loop_run(loop);
close(tfd);
close(sfd);
event_loop_destroy(loop);
printf("bye.\n");
return 0;
}
קולבקים מול מכונת מצבים - callbacks vs state machines¶
בעיית הcallbacks¶
כשיש פרוטוקול מורכב (למשל HTTP), הלוגיקה מתפצלת בין הרבה callbacks, וקשה לעקוב אחרי הזרימה. זה מה שמכנים "callback hell":
// קשה לעקוב: הלוגיקה מפוזרת
void on_request_read(int fd, void *data) {
// קרא בקשה...
// כשסיים, רשום callback לעיבוד
}
void on_request_processed(int fd, void *data) {
// עבד...
// כשסיים, רשום callback לכתיבה
}
void on_response_written(int fd, void *data) {
// נכתב...
// חזור לקרוא בקשה חדשה
}
מכונת מצבים - state machine¶
גישה חלופית: כל חיבור מחזיק מצב (state), וhandler אחד מטפל בכל המצבים:
typedef enum {
STATE_READING_REQUEST,
STATE_PROCESSING,
STATE_WRITING_RESPONSE,
} conn_state_t;
typedef struct {
int fd;
conn_state_t state;
char read_buf[4096];
int read_pos;
char write_buf[4096];
int write_pos;
int write_len;
} connection_t;
void handle_connection(int fd, void *data) {
connection_t *conn = (connection_t *)data;
switch (conn->state) {
case STATE_READING_REQUEST: {
ssize_t n = read(fd, conn->read_buf + conn->read_pos,
sizeof(conn->read_buf) - conn->read_pos);
if (n <= 0) { /* סגור */ return; }
conn->read_pos += n;
// בדוק אם קיבלנו בקשה שלמה
if (strstr(conn->read_buf, "\r\n\r\n")) {
conn->state = STATE_PROCESSING;
// עבד את הבקשה (ישר, כי זה מהיר)
process_request(conn);
conn->state = STATE_WRITING_RESPONSE;
}
break;
}
case STATE_WRITING_RESPONSE: {
ssize_t n = write(fd, conn->write_buf + conn->write_pos,
conn->write_len - conn->write_pos);
if (n <= 0) { /* שגיאה */ return; }
conn->write_pos += n;
if (conn->write_pos >= conn->write_len) {
// סיימנו לכתוב, חזור לקרוא
conn->state = STATE_READING_REQUEST;
conn->read_pos = 0;
conn->write_pos = 0;
}
break;
}
default:
break;
}
}
מכונת מצבים יותר ברורה כשיש פרוטוקול עם שלבים מוגדרים.
קורוטינות ותכנות אסינכרוני - coroutines and async/await¶
הבעיה¶
גם callbacks וגם מכונות מצבים מסרבלים: הלוגיקה שבקוד סינכרוני (רגיל) היא שורה אחרי שורה, אבל בקוד אסינכרוני היא מפוצלת.
קוד סינכרוני (פשוט, אבל חוסם):
קוד אסינכרוני (לא חוסם, אבל מסורבל):
הפתרון: קורוטינות - coroutines¶
קורוטינה היא פונקציה שיכולה "להשהות" את עצמה ולחזור מאוחר יותר מאותה נקודה. זה מאפשר לכתוב קוד שנראה סינכרוני אבל עובד אסינכרוני:
// פסאודו-קוד עם coroutines
async function handle_client(fd):
request = await read(fd) // "השהה" עד שיש נתונים
response = process(request)
await write(fd, response) // "השהה" עד שהכתיבה הושלמה
בC אין תמיכה מובנית בcoroutines (בניגוד ל-Go, Python, Rust, C#). אבל אפשר לממש אותן עם:
- setjmp/longjmp - שמירה ושחזור של הקונטקסט.
- ucontext - API יותר נוח (הוצא משימוש אבל עדיין עובד בלינוקס).
- תוספות קומפיילר (GCC coroutines).
- ספריות כמו libco, minicoro.
העניין הוא שהקונספט הזה הוא הבסיס לasync/await בכל השפות המודרניות:
- גורוטינות ב-Go - הruntime של Go מנהל אלפי goroutines מעל מספר קטן של threads.
- async/await ב-Python (asyncio) - coroutines שרצות על event loop.
- async/await ב-Rust - coroutines שמתקמפלות למכונת מצבים.
- async/await ב-JavaScript (Node.js) - coroutines מעל libuv event loop.
שרת HTTP פשוט עם event loop¶
בואו נבנה שרת HTTP מינימלי שמשתמש ב-event loop שלנו:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdbool.h>
#define PORT 8080
#define MAX_EVENTS 64
#define BUF_SIZE 4096
typedef struct {
int fd;
char read_buf[BUF_SIZE];
int read_len;
char write_buf[BUF_SIZE];
int write_len;
int write_pos;
bool writing;
} http_conn_t;
int epfd;
// הפוך fd ל-non-blocking
void set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
// בנה תשובת HTTP פשוטה
void build_response(http_conn_t *conn) {
const char *body = "<html><body><h1>Hello from C event loop!</h1></body></html>";
conn->write_len = snprintf(conn->write_buf, BUF_SIZE,
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/html\r\n"
"Content-Length: %zu\r\n"
"Connection: close\r\n"
"\r\n"
"%s",
strlen(body), body);
conn->write_pos = 0;
conn->writing = true;
}
// טפל בקריאה
void handle_read(http_conn_t *conn) {
ssize_t n = read(conn->fd, conn->read_buf + conn->read_len,
BUF_SIZE - conn->read_len - 1);
if (n <= 0) {
if (n == 0 || errno != EAGAIN) {
close(conn->fd);
free(conn);
}
return;
}
conn->read_len += n;
conn->read_buf[conn->read_len] = '\0';
// בדוק אם קיבלנו את כל הheaders
if (strstr(conn->read_buf, "\r\n\r\n")) {
// בנה תשובה
build_response(conn);
// עבור למצב כתיבה
struct epoll_event ev = {
.events = EPOLLOUT,
.data.ptr = conn
};
epoll_ctl(epfd, EPOLL_CTL_MOD, conn->fd, &ev);
}
}
// טפל בכתיבה
void handle_write(http_conn_t *conn) {
ssize_t n = write(conn->fd, conn->write_buf + conn->write_pos,
conn->write_len - conn->write_pos);
if (n <= 0) {
if (errno != EAGAIN) {
close(conn->fd);
free(conn);
}
return;
}
conn->write_pos += n;
if (conn->write_pos >= conn->write_len) {
// סיימנו - סגור את החיבור
close(conn->fd);
free(conn);
}
}
int main(void) {
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
int opt = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
set_nonblocking(server_fd);
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = htons(PORT),
.sin_addr.s_addr = INADDR_ANY
};
bind(server_fd, (struct sockaddr *)&addr, sizeof(addr));
listen(server_fd, 128);
epfd = epoll_create1(0);
struct epoll_event ev = {
.events = EPOLLIN,
.data.fd = server_fd
};
epoll_ctl(epfd, EPOLL_CTL_ADD, server_fd, &ev);
printf("HTTP server listening on port %d\n", PORT);
struct epoll_event events[MAX_EVENTS];
while (true) {
int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
if (n < 0) {
if (errno == EINTR) continue;
break;
}
for (int i = 0; i < n; i++) {
if (events[i].data.fd == server_fd) {
// חיבור חדש
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int client_fd = accept(server_fd,
(struct sockaddr *)&client_addr, &len);
if (client_fd < 0) continue;
set_nonblocking(client_fd);
http_conn_t *conn = calloc(1, sizeof(http_conn_t));
conn->fd = client_fd;
struct epoll_event cev = {
.events = EPOLLIN,
.data.ptr = conn
};
epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &cev);
} else {
http_conn_t *conn = events[i].data.ptr;
if (events[i].events & EPOLLIN) {
handle_read(conn);
} else if (events[i].events & EPOLLOUT) {
handle_write(conn);
}
}
}
}
close(server_fd);
close(epfd);
return 0;
}
קומפילציה ובדיקה:
שימו לב: שרת אחד, תהליכון אחד, אפס mutex-ים, ומטפל בהרבה לקוחות במקביל. זו העוצמה של event loop.
סיכום¶
| גישה | תהליכונים | מורכבות | ביצועים | שימושים |
|---|---|---|---|---|
| תהליכון לכל חיבור | רבים | נמוכה | בינוניים | שרתים פשוטים |
| בריכת תהליכונים (thread pool) | N קבועים | בינונית | טובים | שרתי וב, מסדי נתונים |
| לולאת אירועים (event loop) | 1 | גבוהה | מעולים | שרתים עתירי I/O |
| היברידי (event loop + thread pool) | 1 + N | גבוהה | הכי טובים | שרתים מודרניים (Nginx, Node.js) |
הגישה המודרנית ברוב המערכות היא היברידית: event loop ראשי שמטפל ב-I/O, ו-thread pool לעבודות חישוב כבדות. ככה מקבלים את הטוב משני העולמות.