לדלג לתוכן

12.3 io uring פתרון

פתרון לתרגיל 1 - קריאת קבצים עם io_uring

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <liburing.h>

#define NUM_FILES 10
#define BUF_SIZE 256

int main(void) {
    struct io_uring ring;
    io_uring_queue_init(32, &ring, 0);

    int fds[NUM_FILES];
    char *buffers[NUM_FILES];
    char filenames[NUM_FILES][64];

    // צור קבצים זמניים עם תוכן
    for (int i = 0; i < NUM_FILES; i++) {
        snprintf(filenames[i], sizeof(filenames[i]), "/tmp/uring_test_%d.txt", i);
        FILE *f = fopen(filenames[i], "w");
        fprintf(f, "content of file %d: hello from io_uring!\n", i);
        fclose(f);

        fds[i] = open(filenames[i], O_RDONLY);
        if (fds[i] < 0) {
            perror("open");
            return 1;
        }

        buffers[i] = calloc(1, BUF_SIZE);
    }

    // שלח 10 בקשות קריאה בבת אחת
    for (int i = 0; i < NUM_FILES; i++) {
        struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
        io_uring_prep_read(sqe, fds[i], buffers[i], BUF_SIZE - 1, 0);
        io_uring_sqe_set_data64(sqe, i);  // זיהוי לפי אינדקס
    }

    io_uring_submit(&ring);

    // קרא את כל התוצאות
    for (int i = 0; i < NUM_FILES; i++) {
        struct io_uring_cqe *cqe;
        io_uring_wait_cqe(&ring, &cqe);

        int file_idx = (int)cqe->user_data;
        if (cqe->res < 0) {
            printf("file %d: error %d\n", file_idx, cqe->res);
        } else {
            buffers[file_idx][cqe->res] = '\0';
            printf("file %d (%d bytes): %s", file_idx, cqe->res, buffers[file_idx]);
        }

        io_uring_cqe_seen(&ring, cqe);
    }

    // ניקוי
    for (int i = 0; i < NUM_FILES; i++) {
        close(fds[i]);
        free(buffers[i]);
        unlink(filenames[i]);
    }

    io_uring_queue_exit(&ring);
    return 0;
}

קומפילציה:

gcc -o read_files read_files.c -luring


פתרון לתרגיל 2 - שרת echo עם io_uring

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <liburing.h>

#define PORT 9090
#define BUF_SIZE 1024
#define QUEUE_DEPTH 256

enum event_type { EV_ACCEPT, EV_READ, EV_WRITE };

typedef struct {
    int fd;
    enum event_type type;
    char buf[BUF_SIZE];
} conn_info_t;

struct io_uring ring;

int create_server(void) {
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    int opt = 1;
    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    struct sockaddr_in addr = {
        .sin_family = AF_INET,
        .sin_port = htons(PORT),
        .sin_addr.s_addr = INADDR_ANY
    };
    bind(fd, (struct sockaddr *)&addr, sizeof(addr));
    listen(fd, 128);
    return fd;
}

void submit_accept(int server_fd) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    conn_info_t *ci = calloc(1, sizeof(conn_info_t));
    ci->fd = server_fd;
    ci->type = EV_ACCEPT;
    io_uring_prep_accept(sqe, server_fd, NULL, NULL, 0);
    io_uring_sqe_set_data(sqe, ci);
}

void submit_read(int client_fd) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    conn_info_t *ci = calloc(1, sizeof(conn_info_t));
    ci->fd = client_fd;
    ci->type = EV_READ;
    io_uring_prep_recv(sqe, client_fd, ci->buf, BUF_SIZE, 0);
    io_uring_sqe_set_data(sqe, ci);
}

void submit_write(int client_fd, const char *data, int len) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    conn_info_t *ci = calloc(1, sizeof(conn_info_t));
    ci->fd = client_fd;
    ci->type = EV_WRITE;
    memcpy(ci->buf, data, len);
    io_uring_prep_send(sqe, client_fd, ci->buf, len, 0);
    io_uring_sqe_set_data(sqe, ci);
}

int main(void) {
    int server_fd = create_server();
    printf("echo server listening on port %d\n", PORT);

    io_uring_queue_init(QUEUE_DEPTH, &ring, 0);
    submit_accept(server_fd);
    io_uring_submit(&ring);

    while (1) {
        struct io_uring_cqe *cqe;
        io_uring_wait_cqe(&ring, &cqe);

        conn_info_t *ci = io_uring_cqe_get_data(cqe);
        int res = cqe->res;

        switch (ci->type) {
        case EV_ACCEPT:
            if (res >= 0) {
                printf("new connection: fd=%d\n", res);
                submit_read(res);
                submit_accept(server_fd);
            }
            break;

        case EV_READ:
            if (res <= 0) {
                printf("connection closed: fd=%d\n", ci->fd);
                close(ci->fd);
            } else {
                printf("received %d bytes from fd=%d\n", res, ci->fd);
                submit_write(ci->fd, ci->buf, res);
                submit_read(ci->fd);
            }
            break;

        case EV_WRITE:
            if (res < 0) {
                printf("write error on fd=%d: %d\n", ci->fd, res);
                close(ci->fd);
            }
            break;
        }

        io_uring_cqe_seen(&ring, cqe);
        free(ci);
        io_uring_submit(&ring);
    }

    io_uring_queue_exit(&ring);
    close(server_fd);
    return 0;
}

פתרון לתרגיל 3 - העתקת קובץ אסינכרונית

#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <time.h>
#include <liburing.h>

#define BLOCK_SIZE 4096
#define BATCH_SIZE 8
#define QUEUE_DEPTH 64

int main(int argc, char *argv[]) {
    if (argc != 3) {
        fprintf(stderr, "usage: %s <source> <dest>\n", argv[0]);
        return 1;
    }

    int src_fd = open(argv[1], O_RDONLY);
    if (src_fd < 0) { perror("open src"); return 1; }

    int dst_fd = open(argv[2], O_WRONLY | O_CREAT | O_TRUNC, 0644);
    if (dst_fd < 0) { perror("open dst"); return 1; }

    struct stat st;
    fstat(src_fd, &st);
    off_t file_size = st.st_size;
    off_t total_blocks = (file_size + BLOCK_SIZE - 1) / BLOCK_SIZE;

    struct io_uring ring;
    io_uring_queue_init(QUEUE_DEPTH, &ring, 0);

    // הקצה חוצצים
    char *buffers[BATCH_SIZE];
    for (int i = 0; i < BATCH_SIZE; i++) {
        buffers[i] = aligned_alloc(BLOCK_SIZE, BLOCK_SIZE);
    }

    struct timespec start, end;
    clock_gettime(CLOCK_MONOTONIC, &start);

    off_t block_idx = 0;
    int in_flight = 0;

    while (block_idx < total_blocks || in_flight > 0) {
        // שלח בקשות חדשות
        while (in_flight < BATCH_SIZE && block_idx < total_blocks) {
            int buf_idx = in_flight;
            off_t offset = block_idx * BLOCK_SIZE;
            int len = BLOCK_SIZE;
            if (offset + len > file_size) {
                len = file_size - offset;
            }

            // קריאה - משורשרת לכתיבה
            struct io_uring_sqe *sqe_read = io_uring_get_sqe(&ring);
            io_uring_prep_read(sqe_read, src_fd, buffers[buf_idx], len, offset);
            sqe_read->flags |= IOSQE_IO_LINK;
            io_uring_sqe_set_data64(sqe_read, block_idx * 2);  // זוגי = read

            // כתיבה
            struct io_uring_sqe *sqe_write = io_uring_get_sqe(&ring);
            io_uring_prep_write(sqe_write, dst_fd, buffers[buf_idx], len, offset);
            io_uring_sqe_set_data64(sqe_write, block_idx * 2 + 1);  // אי-זוגי = write

            block_idx++;
            in_flight++;
        }

        io_uring_submit(&ring);

        // קרא תוצאות
        struct io_uring_cqe *cqe;
        io_uring_wait_cqe(&ring, &cqe);

        if (cqe->res < 0) {
            fprintf(stderr, "I/O error: %d (event %llu)\n",
                    cqe->res, (unsigned long long)cqe->user_data);
        }

        // כל זוג read+write מייצר 2 CQEs
        // ספור רק את הwrite (אי-זוגי) כ"סיום"
        if (cqe->user_data % 2 == 1) {
            in_flight--;
        }

        io_uring_cqe_seen(&ring, cqe);
    }

    clock_gettime(CLOCK_MONOTONIC, &end);
    double elapsed = (end.tv_sec - start.tv_sec) +
                     (end.tv_nsec - start.tv_nsec) / 1e9;

    printf("copied %ld bytes in %.4f seconds (%.1f MB/s)\n",
           (long)file_size, elapsed,
           (file_size / (1024.0 * 1024.0)) / elapsed);

    for (int i = 0; i < BATCH_SIZE; i++) {
        free(buffers[i]);
    }
    io_uring_queue_exit(&ring);
    close(src_fd);
    close(dst_fd);
    return 0;
}

פתרון לתרגיל 4 - השוואת ביצועים

תרגיל זה מורכב מכמה קבצים. הנה הלקוח לbenchmark (משותף לשתי הגרסאות):

// bench_client.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <time.h>

#define NUM_CLIENTS 100
#define MSGS_PER_CLIENT 1000
#define MSG_SIZE 64
#define PORT 9090

void *client_func(void *arg) {
    (void)arg;
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in addr = {
        .sin_family = AF_INET,
        .sin_port = htons(PORT),
        .sin_addr.s_addr = inet_addr("127.0.0.1")
    };

    if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
        perror("connect");
        return NULL;
    }

    char send_buf[MSG_SIZE];
    char recv_buf[MSG_SIZE];
    memset(send_buf, 'A', MSG_SIZE);

    for (int i = 0; i < MSGS_PER_CLIENT; i++) {
        write(fd, send_buf, MSG_SIZE);
        int total = 0;
        while (total < MSG_SIZE) {
            int n = read(fd, recv_buf + total, MSG_SIZE - total);
            if (n <= 0) break;
            total += n;
        }
    }

    close(fd);
    return NULL;
}

int main(void) {
    struct timespec start, end;
    clock_gettime(CLOCK_MONOTONIC, &start);

    pthread_t threads[NUM_CLIENTS];
    for (int i = 0; i < NUM_CLIENTS; i++) {
        pthread_create(&threads[i], NULL, client_func, NULL);
    }
    for (int i = 0; i < NUM_CLIENTS; i++) {
        pthread_join(threads[i], NULL);
    }

    clock_gettime(CLOCK_MONOTONIC, &end);
    double elapsed = (end.tv_sec - start.tv_sec) +
                     (end.tv_nsec - start.tv_nsec) / 1e9;

    int total_msgs = NUM_CLIENTS * MSGS_PER_CLIENT;
    printf("total messages: %d\n", total_msgs);
    printf("elapsed: %.4f seconds\n", elapsed);
    printf("throughput: %.0f msgs/sec\n", total_msgs / elapsed);

    return 0;
}

קומפילציה:

gcc -O2 -o bench_client bench_client.c -lpthread

הריצו קודם את שרת הepoll (מפרק 11 או מימוש משלכם), ואז את bench_client. אחר כך את שרת הio_uring מתרגיל 2 ושוב את bench_client. השוו את התוצאות.


פתרון לתרגיל 5 - טיימר עם io_uring

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <liburing.h>

#define NUM_TIMERS 5

int main(void) {
    struct io_uring ring;
    io_uring_queue_init(32, &ring, 0);

    struct timespec prog_start;
    clock_gettime(CLOCK_MONOTONIC, &prog_start);

    // שלח 5 טיימרים
    struct __kernel_timespec timeouts[NUM_TIMERS];
    for (int i = 0; i < NUM_TIMERS; i++) {
        timeouts[i].tv_sec = i + 1;
        timeouts[i].tv_nsec = 0;

        struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
        io_uring_prep_timeout(sqe, &timeouts[i], 0, 0);
        io_uring_sqe_set_data64(sqe, i);
    }

    io_uring_submit(&ring);
    printf("submitted %d timers\n", NUM_TIMERS);

    // חכה לכל הטיימרים
    for (int i = 0; i < NUM_TIMERS; i++) {
        struct io_uring_cqe *cqe;
        io_uring_wait_cqe(&ring, &cqe);

        struct timespec now;
        clock_gettime(CLOCK_MONOTONIC, &now);
        double elapsed = (now.tv_sec - prog_start.tv_sec) +
                         (now.tv_nsec - prog_start.tv_nsec) / 1e9;

        int timer_id = (int)cqe->user_data;
        printf("timer %d fired at %.3f seconds (res=%d)\n",
               timer_id, elapsed, cqe->res);

        io_uring_cqe_seen(&ring, cqe);
    }

    io_uring_queue_exit(&ring);
    return 0;
}

קומפילציה:

gcc -o timer_uring timer_uring.c -luring