12.4 תכנות אסינכרוני עם eventloop פתרון
פתרון לתרגיל 1 - event loop בסיסי
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <errno.h>
#include <stdbool.h>
#include <time.h>
#define MAX_EVENTS 64
#define MAX_FDS 1024
typedef void (*event_cb_t)(int fd, void *data);
typedef struct {
event_cb_t on_read;
event_cb_t on_write;
void *data;
bool active;
} source_t;
typedef struct {
int epfd;
source_t sources[MAX_FDS];
bool running;
} event_loop_t;
event_loop_t *event_loop_create(void) {
event_loop_t *loop = calloc(1, sizeof(event_loop_t));
loop->epfd = epoll_create1(0);
loop->running = true;
return loop;
}
void event_loop_destroy(event_loop_t *loop) {
close(loop->epfd);
free(loop);
}
void event_loop_add_reader(event_loop_t *loop, int fd,
event_cb_t cb, void *data) {
struct epoll_event ev = { .events = EPOLLIN, .data.fd = fd };
epoll_ctl(loop->epfd, EPOLL_CTL_ADD, fd, &ev);
loop->sources[fd].on_read = cb;
loop->sources[fd].data = data;
loop->sources[fd].active = true;
}
void event_loop_add_writer(event_loop_t *loop, int fd,
event_cb_t cb, void *data) {
struct epoll_event ev = { .events = EPOLLOUT, .data.fd = fd };
epoll_ctl(loop->epfd, EPOLL_CTL_ADD, fd, &ev);
loop->sources[fd].on_write = cb;
loop->sources[fd].data = data;
loop->sources[fd].active = true;
}
void event_loop_remove(event_loop_t *loop, int fd) {
epoll_ctl(loop->epfd, EPOLL_CTL_DEL, fd, NULL);
loop->sources[fd].active = false;
}
void event_loop_stop(event_loop_t *loop) {
loop->running = false;
}
void event_loop_run(event_loop_t *loop) {
struct epoll_event events[MAX_EVENTS];
while (loop->running) {
int n = epoll_wait(loop->epfd, events, MAX_EVENTS, -1);
if (n < 0) {
if (errno == EINTR) continue;
perror("epoll_wait");
break;
}
for (int i = 0; i < n; i++) {
int fd = events[i].data.fd;
source_t *s = &loop->sources[fd];
if (!s->active) continue;
if ((events[i].events & EPOLLIN) && s->on_read)
s->on_read(fd, s->data);
if ((events[i].events & EPOLLOUT) && s->on_write)
s->on_write(fd, s->data);
}
}
}
// --- שימוש ---
void on_stdin(int fd, void *data) {
(void)data;
char buf[256];
ssize_t n = read(fd, buf, sizeof(buf) - 1);
if (n <= 0) return;
buf[n] = '\0';
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
printf("[%ld.%03ld] %s", ts.tv_sec, ts.tv_nsec / 1000000, buf);
}
int main(void) {
event_loop_t *loop = event_loop_create();
event_loop_add_reader(loop, STDIN_FILENO, on_stdin, NULL);
printf("type something:\n");
event_loop_run(loop);
event_loop_destroy(loop);
return 0;
}
פתרון לתרגיל 2 - טיימרים ב-event loop
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <errno.h>
#include <stdbool.h>
#include <stdint.h>
#include <time.h>
#define MAX_EVENTS 64
#define MAX_FDS 1024
typedef void (*event_cb_t)(int fd, void *data);
typedef struct {
event_cb_t on_read;
void *data;
bool active;
} source_t;
typedef struct {
int epfd;
source_t sources[MAX_FDS];
bool running;
} event_loop_t;
event_loop_t *event_loop_create(void) {
event_loop_t *loop = calloc(1, sizeof(event_loop_t));
loop->epfd = epoll_create1(0);
loop->running = true;
return loop;
}
void event_loop_destroy(event_loop_t *loop) {
close(loop->epfd);
free(loop);
}
void event_loop_add_reader(event_loop_t *loop, int fd,
event_cb_t cb, void *data) {
struct epoll_event ev = { .events = EPOLLIN, .data.fd = fd };
epoll_ctl(loop->epfd, EPOLL_CTL_ADD, fd, &ev);
loop->sources[fd].on_read = cb;
loop->sources[fd].data = data;
loop->sources[fd].active = true;
}
void event_loop_remove(event_loop_t *loop, int fd) {
epoll_ctl(loop->epfd, EPOLL_CTL_DEL, fd, NULL);
loop->sources[fd].active = false;
}
void event_loop_stop(event_loop_t *loop) {
loop->running = false;
}
void event_loop_run(event_loop_t *loop) {
struct epoll_event events[MAX_EVENTS];
while (loop->running) {
int n = epoll_wait(loop->epfd, events, MAX_EVENTS, -1);
if (n < 0) {
if (errno == EINTR) continue;
break;
}
for (int i = 0; i < n; i++) {
int fd = events[i].data.fd;
source_t *s = &loop->sources[fd];
if (s->active && (events[i].events & EPOLLIN) && s->on_read)
s->on_read(fd, s->data);
}
}
}
// הוסף טיימר - מחזיר את הfd של הטיימר
int event_loop_add_timer(event_loop_t *loop, int interval_ms,
bool repeating, event_cb_t cb, void *data) {
int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
struct itimerspec ts = {0};
ts.it_value.tv_sec = interval_ms / 1000;
ts.it_value.tv_nsec = (interval_ms % 1000) * 1000000L;
if (repeating) {
ts.it_interval = ts.it_value;
}
timerfd_settime(tfd, 0, &ts, NULL);
event_loop_add_reader(loop, tfd, cb, data);
return tfd;
}
// --- שימוש ---
int tick_count = 0;
struct timespec start_time;
void print_time(void) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
double elapsed = (now.tv_sec - start_time.tv_sec) +
(now.tv_nsec - start_time.tv_nsec) / 1e9;
printf("[%.1fs] ", elapsed);
}
void on_tick(int fd, void *data) {
(void)data;
uint64_t exp;
read(fd, &exp, sizeof(exp));
tick_count++;
print_time();
printf("tick\n");
}
void on_stats(int fd, void *data) {
(void)data;
uint64_t exp;
read(fd, &exp, sizeof(exp));
print_time();
printf("stats: %d ticks so far\n", tick_count);
}
void on_shutdown(int fd, void *data) {
uint64_t exp;
read(fd, &exp, sizeof(exp));
print_time();
printf("10 seconds passed, shutting down.\n");
event_loop_t *loop = (event_loop_t *)data;
event_loop_stop(loop);
}
int main(void) {
clock_gettime(CLOCK_MONOTONIC, &start_time);
event_loop_t *loop = event_loop_create();
event_loop_add_timer(loop, 1000, true, on_tick, NULL);
event_loop_add_timer(loop, 3000, true, on_stats, NULL);
event_loop_add_timer(loop, 10000, false, on_shutdown, loop);
printf("starting event loop with 3 timers...\n");
event_loop_run(loop);
event_loop_destroy(loop);
printf("bye.\n");
return 0;
}
פתרון לתרגיל 3 - שרת echo עם event loop
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdbool.h>
#include <stdint.h>
#define PORT 9090
#define MAX_EVENTS 64
#define MAX_FDS 1024
#define BUF_SIZE 1024
typedef void (*event_cb_t)(int fd, void *data);
typedef struct {
event_cb_t on_read;
void *data;
bool active;
} source_t;
typedef struct {
int epfd;
source_t sources[MAX_FDS];
bool running;
} event_loop_t;
int active_connections = 0;
event_loop_t *loop;
event_loop_t *event_loop_create(void) {
event_loop_t *l = calloc(1, sizeof(event_loop_t));
l->epfd = epoll_create1(0);
l->running = true;
return l;
}
void event_loop_add_reader(event_loop_t *l, int fd,
event_cb_t cb, void *data) {
struct epoll_event ev = { .events = EPOLLIN, .data.fd = fd };
epoll_ctl(l->epfd, EPOLL_CTL_ADD, fd, &ev);
l->sources[fd].on_read = cb;
l->sources[fd].data = data;
l->sources[fd].active = true;
}
void event_loop_remove(event_loop_t *l, int fd) {
epoll_ctl(l->epfd, EPOLL_CTL_DEL, fd, NULL);
l->sources[fd].active = false;
}
void event_loop_run(event_loop_t *l) {
struct epoll_event events[MAX_EVENTS];
while (l->running) {
int n = epoll_wait(l->epfd, events, MAX_EVENTS, -1);
if (n < 0) {
if (errno == EINTR) continue;
break;
}
for (int i = 0; i < n; i++) {
int fd = events[i].data.fd;
source_t *s = &l->sources[fd];
if (s->active && s->on_read)
s->on_read(fd, s->data);
}
}
}
void set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
void on_client_data(int fd, void *data) {
(void)data;
char buf[BUF_SIZE];
ssize_t n = read(fd, buf, sizeof(buf));
if (n <= 0) {
printf("client fd=%d disconnected\n", fd);
event_loop_remove(loop, fd);
close(fd);
active_connections--;
return;
}
// echo
write(fd, buf, n);
}
void on_new_connection(int fd, void *data) {
(void)data;
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
int client_fd = accept(fd, (struct sockaddr *)&addr, &len);
if (client_fd < 0) return;
set_nonblocking(client_fd);
event_loop_add_reader(loop, client_fd, on_client_data, NULL);
active_connections++;
printf("new connection: fd=%d (total: %d)\n", client_fd, active_connections);
}
void on_stats_timer(int fd, void *data) {
(void)data;
uint64_t exp;
read(fd, &exp, sizeof(exp));
printf("--- active connections: %d ---\n", active_connections);
}
int main(void) {
loop = event_loop_create();
// יצירת socket
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
int opt = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
set_nonblocking(server_fd);
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = htons(PORT),
.sin_addr.s_addr = INADDR_ANY
};
bind(server_fd, (struct sockaddr *)&addr, sizeof(addr));
listen(server_fd, 128);
event_loop_add_reader(loop, server_fd, on_new_connection, NULL);
// טיימר סטטיסטיקה כל 5 שניות
int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
struct itimerspec ts = {
.it_interval = { .tv_sec = 5 },
.it_value = { .tv_sec = 5 }
};
timerfd_settime(tfd, 0, &ts, NULL);
event_loop_add_reader(loop, tfd, on_stats_timer, NULL);
printf("echo server on port %d\n", PORT);
event_loop_run(loop);
close(server_fd);
close(tfd);
free(loop);
return 0;
}
פתרון לתרגיל 4 - שרת HTTP מינימלי
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdbool.h>
#define PORT 8080
#define MAX_EVENTS 64
#define BUF_SIZE 4096
int epfd;
typedef struct {
int fd;
char read_buf[BUF_SIZE];
int read_len;
char write_buf[BUF_SIZE];
int write_len;
int write_pos;
bool writing;
} http_conn_t;
void set_nonblocking(int fd) {
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
}
void close_conn(http_conn_t *conn) {
epoll_ctl(epfd, EPOLL_CTL_DEL, conn->fd, NULL);
close(conn->fd);
free(conn);
}
void build_response(http_conn_t *conn) {
const char *body =
"<html>\r\n"
"<head><title>C Event Loop Server</title></head>\r\n"
"<body>\r\n"
"<h1>Hello from C!</h1>\r\n"
"<p>This page was served by a single-threaded event loop.</p>\r\n"
"</body>\r\n"
"</html>\r\n";
conn->write_len = snprintf(conn->write_buf, BUF_SIZE,
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/html; charset=utf-8\r\n"
"Content-Length: %zu\r\n"
"Connection: close\r\n"
"\r\n"
"%s",
strlen(body), body);
conn->write_pos = 0;
conn->writing = true;
}
void handle_conn(http_conn_t *conn, uint32_t events) {
if (!conn->writing && (events & EPOLLIN)) {
ssize_t n = read(conn->fd, conn->read_buf + conn->read_len,
BUF_SIZE - conn->read_len - 1);
if (n <= 0) {
close_conn(conn);
return;
}
conn->read_len += n;
conn->read_buf[conn->read_len] = '\0';
if (strstr(conn->read_buf, "\r\n\r\n")) {
build_response(conn);
struct epoll_event ev = { .events = EPOLLOUT, .data.ptr = conn };
epoll_ctl(epfd, EPOLL_CTL_MOD, conn->fd, &ev);
}
}
if (conn->writing && (events & EPOLLOUT)) {
ssize_t n = write(conn->fd, conn->write_buf + conn->write_pos,
conn->write_len - conn->write_pos);
if (n <= 0) {
close_conn(conn);
return;
}
conn->write_pos += n;
if (conn->write_pos >= conn->write_len) {
close_conn(conn);
}
}
}
int main(void) {
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
int opt = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
set_nonblocking(server_fd);
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = htons(PORT),
.sin_addr.s_addr = INADDR_ANY
};
bind(server_fd, (struct sockaddr *)&addr, sizeof(addr));
listen(server_fd, 128);
epfd = epoll_create1(0);
struct epoll_event ev = { .events = EPOLLIN, .data.fd = server_fd };
epoll_ctl(epfd, EPOLL_CTL_ADD, server_fd, &ev);
printf("HTTP server on http://localhost:%d\n", PORT);
struct epoll_event events[MAX_EVENTS];
while (true) {
int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
if (n < 0 && errno == EINTR) continue;
for (int i = 0; i < n; i++) {
if (events[i].data.fd == server_fd) {
struct sockaddr_in ca;
socklen_t cl = sizeof(ca);
int cfd = accept(server_fd, (struct sockaddr *)&ca, &cl);
if (cfd < 0) continue;
set_nonblocking(cfd);
http_conn_t *conn = calloc(1, sizeof(http_conn_t));
conn->fd = cfd;
struct epoll_event cev = { .events = EPOLLIN, .data.ptr = conn };
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &cev);
} else {
http_conn_t *conn = events[i].data.ptr;
handle_conn(conn, events[i].events);
}
}
}
close(server_fd);
close(epfd);
return 0;
}
פתרון לתרגיל 5 - מכונת מצבים לפרוטוקול
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdbool.h>
#define PORT 9090
#define MAX_EVENTS 64
#define BUF_SIZE 1024
int epfd;
typedef enum {
STATE_WAIT_LOGIN,
STATE_LOGGED_IN,
STATE_DONE
} conn_state_t;
typedef struct {
int fd;
conn_state_t state;
char username[64];
char buf[BUF_SIZE];
int buf_len;
} proto_conn_t;
void set_nonblocking(int fd) {
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
}
void close_conn(proto_conn_t *conn) {
printf("[fd=%d] connection closed\n", conn->fd);
epoll_ctl(epfd, EPOLL_CTL_DEL, conn->fd, NULL);
close(conn->fd);
free(conn);
}
void send_str(int fd, const char *msg) {
write(fd, msg, strlen(msg));
}
// עבד שורה שלמה
void process_line(proto_conn_t *conn, char *line) {
printf("[fd=%d] recv: %s\n", conn->fd, line);
switch (conn->state) {
case STATE_WAIT_LOGIN:
if (strncmp(line, "LOGIN ", 6) == 0 && strlen(line) > 6) {
strncpy(conn->username, line + 6, 63);
conn->username[63] = '\0';
conn->state = STATE_LOGGED_IN;
send_str(conn->fd, "OK\n");
printf("[fd=%d] user '%s' logged in\n", conn->fd, conn->username);
} else {
send_str(conn->fd, "ERROR\n");
}
break;
case STATE_LOGGED_IN:
if (strncmp(line, "MSG ", 4) == 0) {
char response[BUF_SIZE];
snprintf(response, sizeof(response), "ECHO: %s: %s\n",
conn->username, line + 4);
send_str(conn->fd, response);
} else if (strcmp(line, "QUIT") == 0) {
conn->state = STATE_DONE;
send_str(conn->fd, "BYE\n");
close_conn(conn);
return;
} else {
send_str(conn->fd, "ERROR\n");
}
break;
default:
break;
}
}
void handle_client(proto_conn_t *conn) {
ssize_t n = read(conn->fd, conn->buf + conn->buf_len,
BUF_SIZE - conn->buf_len - 1);
if (n <= 0) {
close_conn(conn);
return;
}
conn->buf_len += n;
conn->buf[conn->buf_len] = '\0';
// עבד שורות שלמות
char *start = conn->buf;
char *newline;
while ((newline = strchr(start, '\n')) != NULL) {
*newline = '\0';
// הסר \r אם יש
if (newline > start && *(newline - 1) == '\r') {
*(newline - 1) = '\0';
}
process_line(conn, start);
if (conn->state == STATE_DONE) return;
start = newline + 1;
}
// העבר שארית לתחילת החוצץ
int remaining = conn->buf_len - (start - conn->buf);
if (remaining > 0 && start != conn->buf) {
memmove(conn->buf, start, remaining);
}
conn->buf_len = remaining;
}
int main(void) {
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
int opt = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
set_nonblocking(server_fd);
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = htons(PORT),
.sin_addr.s_addr = INADDR_ANY
};
bind(server_fd, (struct sockaddr *)&addr, sizeof(addr));
listen(server_fd, 128);
epfd = epoll_create1(0);
struct epoll_event ev = { .events = EPOLLIN, .data.fd = server_fd };
epoll_ctl(epfd, EPOLL_CTL_ADD, server_fd, &ev);
printf("protocol server on port %d\n", PORT);
printf("commands: LOGIN <name>, MSG <text>, QUIT\n");
struct epoll_event events[MAX_EVENTS];
while (true) {
int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
if (n < 0 && errno == EINTR) continue;
for (int i = 0; i < n; i++) {
if (events[i].data.fd == server_fd) {
struct sockaddr_in ca;
socklen_t cl = sizeof(ca);
int cfd = accept(server_fd, (struct sockaddr *)&ca, &cl);
if (cfd < 0) continue;
set_nonblocking(cfd);
proto_conn_t *conn = calloc(1, sizeof(proto_conn_t));
conn->fd = cfd;
conn->state = STATE_WAIT_LOGIN;
struct epoll_event cev = { .events = EPOLLIN, .data.ptr = conn };
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &cev);
printf("[fd=%d] new connection\n", cfd);
send_str(cfd, "WELCOME. Please LOGIN <username>\n");
} else {
proto_conn_t *conn = events[i].data.ptr;
handle_client(conn);
}
}
}
close(server_fd);
close(epfd);
return 0;
}