Zig Async I/O: Dominando o io_uring

O io_uring é a revolução em I/O assíncrono no Linux. Introduzido no kernel 5.1, ele elimina o overhead de syscalls tradicionais através de ring buffers compartilhados entre kernel e userspace — permitindo milhões de operações de I/O por segundo com latência mínima.

Neste guia, você aprenderá a usar io_uring em Zig para criar aplicações de I/O de alta performance: servidores HTTP, processamento de arquivos, e sistemas concurrentes eficientes.

O que é io_uring?

Problema: I/O Tradicional

App        Kernel
 |            |
 |--read()--> |  ← syscall (context switch)
 |            |
 |  (bloqueia)|
 |            |
 |<--dados--- |  ← retorno
 |            |

Cada operação de I/O requer 2 context switches e overhead de syscall.

Solução: io_uring

App Ring Buffer     Kernel Ring Buffer
┌──────────────┐    ┌──────────────┐
│ Submissão    │───▶│ Processa I/O │
│ SQE[n]       │    │              │
└──────────────┘    └──────┬───────┘
                    ┌──────▼───────┐
                    │ Completou    │
                    │ CQE[n]       │──▶ App lê result
                    └──────────────┘

Submission Queue (SQ): App enfileira operações
Completion Queue (CQ): Kernel notifica conclusões
Zero syscalls para operações em batch!

Setup Básico

const std = @import("std");
const linux = std.os.linux;

const io_uring = linux.io_uring;
const io_uring_cqe = linux.io_uring_cqe;
const io_uring_sqe = linux.io_uring_sqe;

// Estrutura wrapper para io_uring
pub const IoUring = struct {
    ring: linux.IO_Uring,
    allocator: std.mem.Allocator,
    
    pub fn init(allocator: std.mem.Allocator, entries: u16) !IoUring {
        var ring: linux.IO_Uring = undefined;
        
        // Cria o ring
        const res = linux.io_uring_init(entries, &ring);
        if (res < 0) {
            return error.IoUringInitFailed;
        }
        
        return .{
            .ring = ring,
            .allocator = allocator,
        };
    }
    
    pub fn deinit(self: *IoUring) void {
        linux.io_uring_queue_exit(&self.ring);
    }
    
    // Obtém SQE (Submission Queue Entry)
    pub fn getSqe(self: *IoUring) ?*io_uring_sqe {
        return linux.io_uring_get_sqe(&self.ring);
    }
    
    // Submete operações pendentes
    pub fn submit(self: *IoUring) !u32 {
        const n = linux.io_uring_submit(&self.ring);
        if (n < 0) {
            return error.SubmitFailed;
        }
        return @intCast(n);
    }
    
    // Espera completion
    pub fn waitCqe(self: *IoUring) !*io_uring_cqe {
        var cqe: ?*io_uring_cqe = null;
        const res = linux.io_uring_wait_cqe(&self.ring, &cqe);
        if (res < 0) {
            return error.WaitFailed;
        }
        return cqe.?;
    }
    
    // Peek CQE sem bloquear
    pub fn peekCqe(self: *IoUring) ?*io_uring_cqe {
        var cqe: ?*io_uring_cqe = null;
        const res = linux.io_uring_peek_cqe(&self.ring, &cqe);
        if (res < 0 or cqe == null) {
            return null;
        }
        return cqe.?;
    }
    
    // Marca CQE como visto
    pub fn seen(self: *IoUring, cqe: *io_uring_cqe) void {
        linux.io_uring_cqe_seen(&self.ring, cqe);
    }
};

Leitura de Arquivos

Leitura Bloqueante Tradicional

// ❌ Tradicional: syscall bloqueante
const fd = try std.fs.cwd().openFile("data.txt", .{});
const content = try fd.reader().readAllAlloc(allocator, 1024 * 1024);

Leitura com io_uring

// ✅ io_uring: non-blocking
const std = @import("std");
const linux = std.os.linux;

fn readFileIoUring(allocator: std.mem.Allocator, path: []const u8) ![]u8 {
    var ring = try IoUring.init(allocator, 32);
    defer ring.deinit();
    
    // Abre arquivo (pode ser feito com io_uring também!)
    // Simplificando com std para exemplo
    const fd = try std.fs.cwd().openFile(path, .{});
    defer fd.close();
    
    // Aloca buffer
    const buffer = try allocator.alloc(u8, 4096);
    errdefer allocator.free(buffer);
    
    // Obtém SQE
    const sqe = ring.getSqe() orelse return error.NoSqeAvailable;
    
    // Configura operação READ
    linux.io_uring_prep_read(
        sqe,
        fd.handle,
        buffer.ptr,
        @intCast(buffer.len),
        0 // offset
    );
    
    // Submete
    _ = try ring.submit();
    
    // Espera completion
    const cqe = try ring.waitCqe();
    defer ring.seen(cqe);
    
    if (cqe.res < 0) {
        return error.ReadFailed;
    }
    
    const bytes_read = @as(usize, @intCast(cqe.res));
    return allocator.realloc(buffer, bytes_read);
}

Servidor TCP com io_uring

const std = @import("std");
const linux = std.os.linux;
const net = std.net;

pub const TcpServer = struct {
    ring: IoUring,
    listen_fd: i32,
    allocator: std.mem.Allocator,
    
    const Client = struct {
        fd: i32,
        buffer: []u8,
        state: enum { reading, writing, idle },
    };
    
    pub fn init(allocator: std.mem.Allocator, port: u16) !TcpServer {
        var ring = try IoUring.init(allocator, 256);
        
        // Cria socket
        const listen_fd = try linux.socket(
            linux.AF.INET,
            linux.SOCK.STREAM | linux.SOCK.CLOEXEC,
            0
        );
        
        // Reuse addr
        const reuse: i32 = 1;
        _ = linux.setsockopt(
            listen_fd,
            linux.SOL.SOCKET,
            linux.SO.REUSEADDR,
            &std.mem.toBytes(reuse),
            @sizeOf(i32)
        );
        
        // Bind
        var addr = net.Address.initIp4(.{ 0, 0, 0, 0 }, port);
        _ = linux.bind(
            listen_fd,
            &addr.any,
            addr.getOsSockLen()
        );
        
        // Listen
        _ = linux.listen(listen_fd, 128);
        
        std.debug.print("Servidor ouvindo na porta {d}\n", .{port});
        
        // Prepara accept inicial
        var server = TcpServer{
            .ring = ring,
            .listen_fd = listen_fd,
            .allocator = allocator,
        };
        
        try server.queueAccept();
        
        return server;
    }
    
    pub fn deinit(self: *TcpServer) void {
        linux.close(self.listen_fd);
        self.ring.deinit();
    }
    
    fn queueAccept(self: *TcpServer) !void {
        const sqe = self.ring.getSqe() orelse return error.NoSqeAvailable;
        
        // Aloca espaço para endereço do cliente
        var addr: net.Address = undefined;
        var addr_len: linux.socklen_t = @sizeOf(linux.sockaddr);
        
        linux.io_uring_prep_accept(
            sqe,
            self.listen_fd,
            &addr.any,
            &addr_len,
            0
        );
        
        // User data para identificar no completion
        sqe.user_data = 0xFFFFFFFF; // Marcador de accept
    }
    
    fn queueRead(self: *TcpServer, client_fd: i32, buffer: []u8) !void {
        const sqe = self.ring.getSqe() orelse return error.NoSqeAvailable;
        
        linux.io_uring_prep_read(
            sqe,
            client_fd,
            buffer.ptr,
            @intCast(buffer.len),
            0
        );
        
        sqe.user_data = @intCast(client_fd);
    }
    
    fn queueWrite(self: *TcpServer, client_fd: i32, buffer: []const u8) !void {
        const sqe = self.ring.getSqe() orelse return error.NoSqeAvailable;
        
        linux.io_uring_prep_write(
            sqe,
            client_fd,
            buffer.ptr,
            @intCast(buffer.len),
            0
        );
        
        sqe.user_data = @intCast(client_fd);
    }
    
    pub fn run(self: *TcpServer) !void {
        var clients = std.AutoHashMap(i32, Client).init(self.allocator);
        defer {
            var it = clients.valueIterator();
            while (it.next()) |client| {
                self.allocator.free(client.buffer);
                linux.close(client.fd);
            }
            clients.deinit();
        }
        
        while (true) {
            // Submete operações pendentes
            _ = self.ring.submit() catch |err| {
                std.debug.print("Submit error: {any}\n", .{err});
                continue;
            };
            
            // Espera completions
            const cqe = self.ring.waitCqe() catch |err| {
                std.debug.print("Wait error: {any}\n", .{err});
                continue;
            };
            defer self.ring.seen(cqe);
            
            if (cqe.res < 0) {
                std.debug.print("Op error: {d}\n", .{cqe.res});
                continue;
            }
            
            const user_data = cqe.user_data;
            
            if (user_data == 0xFFFFFFFF) {
                // Novo cliente
                const client_fd = cqe.res;
                std.debug.print("Novo cliente: {d}\n", .{client_fd});
                
                const buffer = try self.allocator.alloc(u8, 4096);
                
                try clients.put(client_fd, .{
                    .fd = client_fd,
                    .buffer = buffer,
                    .state = .reading,
                });
                
                // Queue read para novo cliente
                try self.queueRead(client_fd, buffer);
                
                // Queue proximo accept
                try self.queueAccept();
            } else {
                // Dados de cliente existente
                const client_fd = @as(i32, @intCast(user_data));
                const bytes_read = cqe.res;
                
                if (bytes_read == 0) {
                    // Cliente desconectou
                    std.debug.print("Cliente {d} desconectou\n", .{client_fd});
                    if (clients.getEntry(client_fd)) |entry| {
                        self.allocator.free(entry.value_ptr.buffer);
                        linux.close(client_fd);
                        _ = clients.remove(client_fd);
                    }
                    continue;
                }
                
                // Processa dados recebidos
                if (clients.get(client_fd)) |client| {
                    const data = client.buffer[0..@intCast(bytes_read)];
                    std.debug.print("Recebido de {d}: {s}\n", .{ client_fd, data });
                    
                    // Echo + queue proximo read
                    try self.queueWrite(client_fd, data);
                    try self.queueRead(client_fd, client.buffer);
                }
            }
        }
    }
};

pub fn main() !void {
    const allocator = std.heap.page_allocator;
    var server = try TcpServer.init(allocator, 8080);
    defer server.deinit();
    
    try server.run();
}

Batch de Operações

// Processa múltiplos arquivos em paralelo
fn processFilesBatch(allocator: std.mem.Allocator, paths: []const []const u8) !void {
    var ring = try IoUring.init(allocator, 256);
    defer ring.deinit();
    
    const B = struct {
        fd: i32,
        buffer: []u8,
        path: []const u8,
    };
    
    var buffers = try allocator.alloc(B, paths.len);
    defer {
        for (buffers) |b| {
            allocator.free(b.buffer);
            linux.close(b.fd);
        }
        allocator.free(buffers);
    }
    
    // Queue todas as leituras
    for (paths, 0..) |path, i| {
        const fd = try std.fs.cwd().openFile(path, .{});
        
        const buffer = try allocator.alloc(u8, 1024);
        
        buffers[i] = .{
            .fd = fd.handle,
            .buffer = buffer,
            .path = path,
        };
        
        const sqe = ring.getSqe() orelse break;
        linux.io_uring_prep_read(sqe, fd.handle, buffer.ptr, 1024, 0);
        sqe.user_data = i;
    }
    
    // Submete tudo de uma vez (1 syscall!)
    _ = try ring.submit();
    
    // Processa completions
    var completed: usize = 0;
    while (completed < paths.len) {
        const cqe = try ring.waitCqe();
        defer ring.seen(cqe);
        
        const idx = cqe.user_data;
        const bytes = cqe.res;
        
        std.debug.print("Arquivo {s}: lidos {d} bytes\n", .{
            buffers[idx].path,
            bytes
        });
        
        completed += 1;
    }
}

Features Avançadas do io_uring

Fixed Buffers (para melhor performance)

fn setupFixedBuffers(ring: *IoUring) !void {
    // Registra buffers que serão reusados
    var buffers: [16][4096]u8 = undefined;
    var iovecs: [16]linux.iovec = undefined;
    
    for (&iovecs, 0..) |*vec, i| {
        vec.iov_base = &buffers[i];
        vec.iov_len = 4096;
    }
    
    // Registra no kernel (evita copy!)
    const res = linux.io_uring_register(
        &ring.ring,
    linux.IORING_REGISTER_BUFFERS,
        &iovecs,
        16
    );
    
    if (res < 0) return error.RegisterFailed;
}

Linked Operations

// Chain: read → process → write
fn linkedOperations(ring: *IoUring, read_fd: i32, write_fd: i32) !void {
    // SQE 1: Read
    const sqe1 = ring.getSqe() orelse return error.NoSqe;
    var buffer: [1024]u8 = undefined;
    linux.io_uring_prep_read(sqe1, read_fd, &buffer, buffer.len, 0);
    sqe1.flags |= linux.IOSQE_IO_LINK; // Marca como linked
    
    // SQE 2: Write (só executa se read suceder)
    const sqe2 = ring.getSqe() orelse return error.NoSqe;
    linux.io_uring_prep_write(sqe2, write_fd, &buffer, buffer.len, 0);
    
    _ = try ring.submit();
}

Timeout e Cancelamento

fn readWithTimeout(ring: *IoUring, fd: i32, buffer: []u8, timeout_ms: u32) !usize {
    // Prepara read
    const read_sqe = ring.getSqe() orelse return error.NoSqe;
    linux.io_uring_prep_read(read_sqe, fd, buffer.ptr, @intCast(buffer.len), 0);
    read_sqe.user_data = 1;
    
    // Prepara timeout (linked)
    const timeout_sqe = ring.getSqe() orelse return error.NoSqe;
    const ts = linux.__kernel_timespec{
        .tv_sec = timeout_ms / 1000,
        .tv_nsec = (timeout_ms % 1000) * 1_000_000,
    };
    linux.io_uring_prep_link_timeout(timeout_sqe, &ts, 0);
    timeout_sqe.user_data = 2;
    
    _ = try ring.submit();
    
    // Espera
    const cqe = try ring.waitCqe();
    defer ring.seen(cqe);
    
    if (cqe.user_data == 2) {
        return error.Timeout; // Timeout disparou
    }
    
    if (cqe.res < 0) {
        return error.ReadFailed;
    }
    
    return @intCast(cqe.res);
}

Comparação de Performance

Cenárioepollio_uringMelhoria
IOPS sequencial (1 thread)500K2M+4x
Latência média5-10μs1-2μs5x
Syscalls por I/O20 (batched)
Throughput de arquivos1GB/s4GB/s+4x

Próximos Passos

  1. 🔗 Zig e WebSockets — Combine com io_uring para servidor real-time
  2. SIMD em Zig — Processamento paralelo de dados
  3. 🔗 Zig e Databases — I/O async com PostgreSQL
  4. 📦 Zig em Produção — Deploy de servidores io_uring

Recursos Adicionais


Usando io_uring com Zig em produção? Compartilhe seus benchmarks!

Continue aprendendo Zig

Explore mais tutoriais e artigos em português para dominar a linguagem Zig.