לדלג לתוכן

12.4 תכנות אסינכרוני עם eventloop פתרון

פתרון לתרגיל 1 - event loop בסיסי

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <errno.h>
#include <stdbool.h>
#include <time.h>

#define MAX_EVENTS 64
#define MAX_FDS 1024

typedef void (*event_cb_t)(int fd, void *data);

typedef struct {
    event_cb_t on_read;
    event_cb_t on_write;
    void *data;
    bool active;
} source_t;

typedef struct {
    int epfd;
    source_t sources[MAX_FDS];
    bool running;
} event_loop_t;

event_loop_t *event_loop_create(void) {
    event_loop_t *loop = calloc(1, sizeof(event_loop_t));
    loop->epfd = epoll_create1(0);
    loop->running = true;
    return loop;
}

void event_loop_destroy(event_loop_t *loop) {
    close(loop->epfd);
    free(loop);
}

void event_loop_add_reader(event_loop_t *loop, int fd,
                            event_cb_t cb, void *data) {
    struct epoll_event ev = { .events = EPOLLIN, .data.fd = fd };
    epoll_ctl(loop->epfd, EPOLL_CTL_ADD, fd, &ev);
    loop->sources[fd].on_read = cb;
    loop->sources[fd].data = data;
    loop->sources[fd].active = true;
}

void event_loop_add_writer(event_loop_t *loop, int fd,
                            event_cb_t cb, void *data) {
    struct epoll_event ev = { .events = EPOLLOUT, .data.fd = fd };
    epoll_ctl(loop->epfd, EPOLL_CTL_ADD, fd, &ev);
    loop->sources[fd].on_write = cb;
    loop->sources[fd].data = data;
    loop->sources[fd].active = true;
}

void event_loop_remove(event_loop_t *loop, int fd) {
    epoll_ctl(loop->epfd, EPOLL_CTL_DEL, fd, NULL);
    loop->sources[fd].active = false;
}

void event_loop_stop(event_loop_t *loop) {
    loop->running = false;
}

void event_loop_run(event_loop_t *loop) {
    struct epoll_event events[MAX_EVENTS];
    while (loop->running) {
        int n = epoll_wait(loop->epfd, events, MAX_EVENTS, -1);
        if (n < 0) {
            if (errno == EINTR) continue;
            perror("epoll_wait");
            break;
        }
        for (int i = 0; i < n; i++) {
            int fd = events[i].data.fd;
            source_t *s = &loop->sources[fd];
            if (!s->active) continue;

            if ((events[i].events & EPOLLIN) && s->on_read)
                s->on_read(fd, s->data);
            if ((events[i].events & EPOLLOUT) && s->on_write)
                s->on_write(fd, s->data);
        }
    }
}

// --- שימוש ---

void on_stdin(int fd, void *data) {
    (void)data;
    char buf[256];
    ssize_t n = read(fd, buf, sizeof(buf) - 1);
    if (n <= 0) return;
    buf[n] = '\0';

    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);
    printf("[%ld.%03ld] %s", ts.tv_sec, ts.tv_nsec / 1000000, buf);
}

int main(void) {
    event_loop_t *loop = event_loop_create();
    event_loop_add_reader(loop, STDIN_FILENO, on_stdin, NULL);
    printf("type something:\n");
    event_loop_run(loop);
    event_loop_destroy(loop);
    return 0;
}

פתרון לתרגיל 2 - טיימרים ב-event loop

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <errno.h>
#include <stdbool.h>
#include <stdint.h>
#include <time.h>

#define MAX_EVENTS 64
#define MAX_FDS 1024

typedef void (*event_cb_t)(int fd, void *data);

typedef struct {
    event_cb_t on_read;
    void *data;
    bool active;
} source_t;

typedef struct {
    int epfd;
    source_t sources[MAX_FDS];
    bool running;
} event_loop_t;

event_loop_t *event_loop_create(void) {
    event_loop_t *loop = calloc(1, sizeof(event_loop_t));
    loop->epfd = epoll_create1(0);
    loop->running = true;
    return loop;
}

void event_loop_destroy(event_loop_t *loop) {
    close(loop->epfd);
    free(loop);
}

void event_loop_add_reader(event_loop_t *loop, int fd,
                            event_cb_t cb, void *data) {
    struct epoll_event ev = { .events = EPOLLIN, .data.fd = fd };
    epoll_ctl(loop->epfd, EPOLL_CTL_ADD, fd, &ev);
    loop->sources[fd].on_read = cb;
    loop->sources[fd].data = data;
    loop->sources[fd].active = true;
}

void event_loop_remove(event_loop_t *loop, int fd) {
    epoll_ctl(loop->epfd, EPOLL_CTL_DEL, fd, NULL);
    loop->sources[fd].active = false;
}

void event_loop_stop(event_loop_t *loop) {
    loop->running = false;
}

void event_loop_run(event_loop_t *loop) {
    struct epoll_event events[MAX_EVENTS];
    while (loop->running) {
        int n = epoll_wait(loop->epfd, events, MAX_EVENTS, -1);
        if (n < 0) {
            if (errno == EINTR) continue;
            break;
        }
        for (int i = 0; i < n; i++) {
            int fd = events[i].data.fd;
            source_t *s = &loop->sources[fd];
            if (s->active && (events[i].events & EPOLLIN) && s->on_read)
                s->on_read(fd, s->data);
        }
    }
}

// הוסף טיימר - מחזיר את הfd של הטיימר
int event_loop_add_timer(event_loop_t *loop, int interval_ms,
                          bool repeating, event_cb_t cb, void *data) {
    int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
    struct itimerspec ts = {0};

    ts.it_value.tv_sec = interval_ms / 1000;
    ts.it_value.tv_nsec = (interval_ms % 1000) * 1000000L;

    if (repeating) {
        ts.it_interval = ts.it_value;
    }

    timerfd_settime(tfd, 0, &ts, NULL);
    event_loop_add_reader(loop, tfd, cb, data);
    return tfd;
}

// --- שימוש ---

int tick_count = 0;
struct timespec start_time;

void print_time(void) {
    struct timespec now;
    clock_gettime(CLOCK_MONOTONIC, &now);
    double elapsed = (now.tv_sec - start_time.tv_sec) +
                     (now.tv_nsec - start_time.tv_nsec) / 1e9;
    printf("[%.1fs] ", elapsed);
}

void on_tick(int fd, void *data) {
    (void)data;
    uint64_t exp;
    read(fd, &exp, sizeof(exp));
    tick_count++;
    print_time();
    printf("tick\n");
}

void on_stats(int fd, void *data) {
    (void)data;
    uint64_t exp;
    read(fd, &exp, sizeof(exp));
    print_time();
    printf("stats: %d ticks so far\n", tick_count);
}

void on_shutdown(int fd, void *data) {
    uint64_t exp;
    read(fd, &exp, sizeof(exp));
    print_time();
    printf("10 seconds passed, shutting down.\n");
    event_loop_t *loop = (event_loop_t *)data;
    event_loop_stop(loop);
}

int main(void) {
    clock_gettime(CLOCK_MONOTONIC, &start_time);
    event_loop_t *loop = event_loop_create();

    event_loop_add_timer(loop, 1000, true, on_tick, NULL);
    event_loop_add_timer(loop, 3000, true, on_stats, NULL);
    event_loop_add_timer(loop, 10000, false, on_shutdown, loop);

    printf("starting event loop with 3 timers...\n");
    event_loop_run(loop);

    event_loop_destroy(loop);
    printf("bye.\n");
    return 0;
}

פתרון לתרגיל 3 - שרת echo עם event loop

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdbool.h>
#include <stdint.h>

#define PORT 9090
#define MAX_EVENTS 64
#define MAX_FDS 1024
#define BUF_SIZE 1024

typedef void (*event_cb_t)(int fd, void *data);

typedef struct {
    event_cb_t on_read;
    void *data;
    bool active;
} source_t;

typedef struct {
    int epfd;
    source_t sources[MAX_FDS];
    bool running;
} event_loop_t;

int active_connections = 0;
event_loop_t *loop;

event_loop_t *event_loop_create(void) {
    event_loop_t *l = calloc(1, sizeof(event_loop_t));
    l->epfd = epoll_create1(0);
    l->running = true;
    return l;
}

void event_loop_add_reader(event_loop_t *l, int fd,
                            event_cb_t cb, void *data) {
    struct epoll_event ev = { .events = EPOLLIN, .data.fd = fd };
    epoll_ctl(l->epfd, EPOLL_CTL_ADD, fd, &ev);
    l->sources[fd].on_read = cb;
    l->sources[fd].data = data;
    l->sources[fd].active = true;
}

void event_loop_remove(event_loop_t *l, int fd) {
    epoll_ctl(l->epfd, EPOLL_CTL_DEL, fd, NULL);
    l->sources[fd].active = false;
}

void event_loop_run(event_loop_t *l) {
    struct epoll_event events[MAX_EVENTS];
    while (l->running) {
        int n = epoll_wait(l->epfd, events, MAX_EVENTS, -1);
        if (n < 0) {
            if (errno == EINTR) continue;
            break;
        }
        for (int i = 0; i < n; i++) {
            int fd = events[i].data.fd;
            source_t *s = &l->sources[fd];
            if (s->active && s->on_read)
                s->on_read(fd, s->data);
        }
    }
}

void set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

void on_client_data(int fd, void *data) {
    (void)data;
    char buf[BUF_SIZE];
    ssize_t n = read(fd, buf, sizeof(buf));

    if (n <= 0) {
        printf("client fd=%d disconnected\n", fd);
        event_loop_remove(loop, fd);
        close(fd);
        active_connections--;
        return;
    }

    // echo
    write(fd, buf, n);
}

void on_new_connection(int fd, void *data) {
    (void)data;
    struct sockaddr_in addr;
    socklen_t len = sizeof(addr);
    int client_fd = accept(fd, (struct sockaddr *)&addr, &len);

    if (client_fd < 0) return;
    set_nonblocking(client_fd);

    event_loop_add_reader(loop, client_fd, on_client_data, NULL);
    active_connections++;
    printf("new connection: fd=%d (total: %d)\n", client_fd, active_connections);
}

void on_stats_timer(int fd, void *data) {
    (void)data;
    uint64_t exp;
    read(fd, &exp, sizeof(exp));
    printf("--- active connections: %d ---\n", active_connections);
}

int main(void) {
    loop = event_loop_create();

    // יצירת socket
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    int opt = 1;
    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    set_nonblocking(server_fd);

    struct sockaddr_in addr = {
        .sin_family = AF_INET,
        .sin_port = htons(PORT),
        .sin_addr.s_addr = INADDR_ANY
    };
    bind(server_fd, (struct sockaddr *)&addr, sizeof(addr));
    listen(server_fd, 128);

    event_loop_add_reader(loop, server_fd, on_new_connection, NULL);

    // טיימר סטטיסטיקה כל 5 שניות
    int tfd = timerfd_create(CLOCK_MONOTONIC, 0);
    struct itimerspec ts = {
        .it_interval = { .tv_sec = 5 },
        .it_value = { .tv_sec = 5 }
    };
    timerfd_settime(tfd, 0, &ts, NULL);
    event_loop_add_reader(loop, tfd, on_stats_timer, NULL);

    printf("echo server on port %d\n", PORT);
    event_loop_run(loop);

    close(server_fd);
    close(tfd);
    free(loop);
    return 0;
}

פתרון לתרגיל 4 - שרת HTTP מינימלי

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdbool.h>

#define PORT 8080
#define MAX_EVENTS 64
#define BUF_SIZE 4096

int epfd;

typedef struct {
    int fd;
    char read_buf[BUF_SIZE];
    int read_len;
    char write_buf[BUF_SIZE];
    int write_len;
    int write_pos;
    bool writing;
} http_conn_t;

void set_nonblocking(int fd) {
    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
}

void close_conn(http_conn_t *conn) {
    epoll_ctl(epfd, EPOLL_CTL_DEL, conn->fd, NULL);
    close(conn->fd);
    free(conn);
}

void build_response(http_conn_t *conn) {
    const char *body =
        "<html>\r\n"
        "<head><title>C Event Loop Server</title></head>\r\n"
        "<body>\r\n"
        "<h1>Hello from C!</h1>\r\n"
        "<p>This page was served by a single-threaded event loop.</p>\r\n"
        "</body>\r\n"
        "</html>\r\n";

    conn->write_len = snprintf(conn->write_buf, BUF_SIZE,
        "HTTP/1.1 200 OK\r\n"
        "Content-Type: text/html; charset=utf-8\r\n"
        "Content-Length: %zu\r\n"
        "Connection: close\r\n"
        "\r\n"
        "%s",
        strlen(body), body);
    conn->write_pos = 0;
    conn->writing = true;
}

void handle_conn(http_conn_t *conn, uint32_t events) {
    if (!conn->writing && (events & EPOLLIN)) {
        ssize_t n = read(conn->fd, conn->read_buf + conn->read_len,
                         BUF_SIZE - conn->read_len - 1);
        if (n <= 0) {
            close_conn(conn);
            return;
        }
        conn->read_len += n;
        conn->read_buf[conn->read_len] = '\0';

        if (strstr(conn->read_buf, "\r\n\r\n")) {
            build_response(conn);
            struct epoll_event ev = { .events = EPOLLOUT, .data.ptr = conn };
            epoll_ctl(epfd, EPOLL_CTL_MOD, conn->fd, &ev);
        }
    }

    if (conn->writing && (events & EPOLLOUT)) {
        ssize_t n = write(conn->fd, conn->write_buf + conn->write_pos,
                          conn->write_len - conn->write_pos);
        if (n <= 0) {
            close_conn(conn);
            return;
        }
        conn->write_pos += n;
        if (conn->write_pos >= conn->write_len) {
            close_conn(conn);
        }
    }
}

int main(void) {
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    int opt = 1;
    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    set_nonblocking(server_fd);

    struct sockaddr_in addr = {
        .sin_family = AF_INET,
        .sin_port = htons(PORT),
        .sin_addr.s_addr = INADDR_ANY
    };
    bind(server_fd, (struct sockaddr *)&addr, sizeof(addr));
    listen(server_fd, 128);

    epfd = epoll_create1(0);
    struct epoll_event ev = { .events = EPOLLIN, .data.fd = server_fd };
    epoll_ctl(epfd, EPOLL_CTL_ADD, server_fd, &ev);

    printf("HTTP server on http://localhost:%d\n", PORT);

    struct epoll_event events[MAX_EVENTS];
    while (true) {
        int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
        if (n < 0 && errno == EINTR) continue;

        for (int i = 0; i < n; i++) {
            if (events[i].data.fd == server_fd) {
                struct sockaddr_in ca;
                socklen_t cl = sizeof(ca);
                int cfd = accept(server_fd, (struct sockaddr *)&ca, &cl);
                if (cfd < 0) continue;
                set_nonblocking(cfd);

                http_conn_t *conn = calloc(1, sizeof(http_conn_t));
                conn->fd = cfd;

                struct epoll_event cev = { .events = EPOLLIN, .data.ptr = conn };
                epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &cev);
            } else {
                http_conn_t *conn = events[i].data.ptr;
                handle_conn(conn, events[i].events);
            }
        }
    }

    close(server_fd);
    close(epfd);
    return 0;
}

פתרון לתרגיל 5 - מכונת מצבים לפרוטוקול

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdbool.h>

#define PORT 9090
#define MAX_EVENTS 64
#define BUF_SIZE 1024

int epfd;

typedef enum {
    STATE_WAIT_LOGIN,
    STATE_LOGGED_IN,
    STATE_DONE
} conn_state_t;

typedef struct {
    int fd;
    conn_state_t state;
    char username[64];
    char buf[BUF_SIZE];
    int buf_len;
} proto_conn_t;

void set_nonblocking(int fd) {
    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
}

void close_conn(proto_conn_t *conn) {
    printf("[fd=%d] connection closed\n", conn->fd);
    epoll_ctl(epfd, EPOLL_CTL_DEL, conn->fd, NULL);
    close(conn->fd);
    free(conn);
}

void send_str(int fd, const char *msg) {
    write(fd, msg, strlen(msg));
}

// עבד שורה שלמה
void process_line(proto_conn_t *conn, char *line) {
    printf("[fd=%d] recv: %s\n", conn->fd, line);

    switch (conn->state) {
    case STATE_WAIT_LOGIN:
        if (strncmp(line, "LOGIN ", 6) == 0 && strlen(line) > 6) {
            strncpy(conn->username, line + 6, 63);
            conn->username[63] = '\0';
            conn->state = STATE_LOGGED_IN;
            send_str(conn->fd, "OK\n");
            printf("[fd=%d] user '%s' logged in\n", conn->fd, conn->username);
        } else {
            send_str(conn->fd, "ERROR\n");
        }
        break;

    case STATE_LOGGED_IN:
        if (strncmp(line, "MSG ", 4) == 0) {
            char response[BUF_SIZE];
            snprintf(response, sizeof(response), "ECHO: %s: %s\n",
                     conn->username, line + 4);
            send_str(conn->fd, response);
        } else if (strcmp(line, "QUIT") == 0) {
            conn->state = STATE_DONE;
            send_str(conn->fd, "BYE\n");
            close_conn(conn);
            return;
        } else {
            send_str(conn->fd, "ERROR\n");
        }
        break;

    default:
        break;
    }
}

void handle_client(proto_conn_t *conn) {
    ssize_t n = read(conn->fd, conn->buf + conn->buf_len,
                     BUF_SIZE - conn->buf_len - 1);

    if (n <= 0) {
        close_conn(conn);
        return;
    }

    conn->buf_len += n;
    conn->buf[conn->buf_len] = '\0';

    // עבד שורות שלמות
    char *start = conn->buf;
    char *newline;
    while ((newline = strchr(start, '\n')) != NULL) {
        *newline = '\0';
        // הסר \r אם יש
        if (newline > start && *(newline - 1) == '\r') {
            *(newline - 1) = '\0';
        }

        process_line(conn, start);
        if (conn->state == STATE_DONE) return;

        start = newline + 1;
    }

    // העבר שארית לתחילת החוצץ
    int remaining = conn->buf_len - (start - conn->buf);
    if (remaining > 0 && start != conn->buf) {
        memmove(conn->buf, start, remaining);
    }
    conn->buf_len = remaining;
}

int main(void) {
    int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    int opt = 1;
    setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    set_nonblocking(server_fd);

    struct sockaddr_in addr = {
        .sin_family = AF_INET,
        .sin_port = htons(PORT),
        .sin_addr.s_addr = INADDR_ANY
    };
    bind(server_fd, (struct sockaddr *)&addr, sizeof(addr));
    listen(server_fd, 128);

    epfd = epoll_create1(0);
    struct epoll_event ev = { .events = EPOLLIN, .data.fd = server_fd };
    epoll_ctl(epfd, EPOLL_CTL_ADD, server_fd, &ev);

    printf("protocol server on port %d\n", PORT);
    printf("commands: LOGIN <name>, MSG <text>, QUIT\n");

    struct epoll_event events[MAX_EVENTS];
    while (true) {
        int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
        if (n < 0 && errno == EINTR) continue;

        for (int i = 0; i < n; i++) {
            if (events[i].data.fd == server_fd) {
                struct sockaddr_in ca;
                socklen_t cl = sizeof(ca);
                int cfd = accept(server_fd, (struct sockaddr *)&ca, &cl);
                if (cfd < 0) continue;
                set_nonblocking(cfd);

                proto_conn_t *conn = calloc(1, sizeof(proto_conn_t));
                conn->fd = cfd;
                conn->state = STATE_WAIT_LOGIN;

                struct epoll_event cev = { .events = EPOLLIN, .data.ptr = conn };
                epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &cev);

                printf("[fd=%d] new connection\n", cfd);
                send_str(cfd, "WELCOME. Please LOGIN <username>\n");
            } else {
                proto_conn_t *conn = events[i].data.ptr;
                handle_client(conn);
            }
        }
    }

    close(server_fd);
    close(epfd);
    return 0;
}