Ring Buffer Lock-Free em Zig — Tutorial Passo a Passo

Ring Buffer Lock-Free em Zig — Tutorial Passo a Passo

Neste tutorial, vamos construir um ring buffer (buffer circular) lock-free em Zig usando operações atômicas. Este é o padrão de comunicação mais eficiente entre threads producer-consumer: sem mutex, sem bloqueio, apenas leituras e escritas atômicas.

O Que Vamos Construir

Nosso ring buffer vai:

  • Implementar SPSC (Single Producer, Single Consumer) lock-free
  • Usar operações atômicas para sincronização sem mutex
  • Capacidade configurável em tempo de compilação via comptime
  • Suportar leitura e escrita de tipos genéricos
  • Incluir métricas de ocupação e throughput
  • Demonstrar comunicação entre threads producer e consumer

Por Que Este Projeto?

Ring buffers lock-free são usados em kernels de sistema operacional, drivers de áudio, engines de jogos e sistemas de trading. Eles são a estrutura de comunicação inter-thread mais rápida possível porque evitam completamente locks. Em Zig, temos acesso direto a operações atômicas.

Pré-requisitos

Passo 1: Estrutura do Projeto

mkdir ring-buffer
cd ring-buffer
zig init

Passo 2: O Ring Buffer Lock-Free

const std = @import("std");
const Thread = std.Thread;
const io = std.io;
const mem = std.mem;
const time = std.time;
const atomic = std.atomic;

/// Ring Buffer Lock-Free para comunicação SPSC
/// (Single Producer, Single Consumer).
///
/// O truque: usamos dois índices atômicos (write_pos e read_pos).
/// O producer só atualiza write_pos, o consumer só atualiza read_pos.
/// Não é necessário nenhum lock!
///
/// Capacidade deve ser potência de 2 para otimizar com máscara de bits.
pub fn RingBuffer(comptime T: type, comptime capacity: usize) type {
    // Verificar que capacity é potência de 2
    if (capacity == 0 or (capacity & (capacity - 1)) != 0) {
        @compileError("Capacidade deve ser potencia de 2");
    }

    return struct {
        buffer: [capacity]T,
        write_pos: atomic.Value(usize),
        read_pos: atomic.Value(usize),

        const Self = @This();
        const MASK = capacity - 1;

        pub fn init() Self {
            return .{
                .buffer = undefined,
                .write_pos = atomic.Value(usize).init(0),
                .read_pos = atomic.Value(usize).init(0),
            };
        }

        /// Tenta escrever um item. Retorna false se cheio (não bloqueia).
        pub fn tryPush(self: *Self, item: T) bool {
            const wp = self.write_pos.load(.acquire);
            const rp = self.read_pos.load(.acquire);

            // Cheio quando write está uma volta inteira à frente do read
            if (wp - rp >= capacity) return false;

            self.buffer[wp & MASK] = item;

            // Memory barrier: garantir que os dados são escritos
            // antes de atualizar write_pos
            self.write_pos.store(wp + 1, .release);

            return true;
        }

        /// Tenta ler um item. Retorna null se vazio (não bloqueia).
        pub fn tryPop(self: *Self) ?T {
            const rp = self.read_pos.load(.acquire);
            const wp = self.write_pos.load(.acquire);

            // Vazio quando read alcançou write
            if (rp >= wp) return null;

            const item = self.buffer[rp & MASK];

            // Memory barrier: garantir que os dados são lidos
            // antes de atualizar read_pos
            self.read_pos.store(rp + 1, .release);

            return item;
        }

        /// Escreve um item, esperando se necessário (bloqueante).
        pub fn push(self: *Self, item: T) void {
            while (!self.tryPush(item)) {
                // Spin wait com hint para o processador
                std.atomic.spinLoopHint();
            }
        }

        /// Lê um item, esperando se necessário (bloqueante).
        pub fn pop(self: *Self) T {
            while (true) {
                if (self.tryPop()) |item| return item;
                std.atomic.spinLoopHint();
            }
        }

        /// Retorna quantos itens estão no buffer.
        pub fn len(self: *const Self) usize {
            const wp = self.write_pos.load(.acquire);
            const rp = self.read_pos.load(.acquire);
            return wp - rp;
        }

        /// Retorna true se o buffer está vazio.
        pub fn isEmpty(self: *const Self) bool {
            return self.len() == 0;
        }

        /// Retorna true se o buffer está cheio.
        pub fn isFull(self: *const Self) bool {
            return self.len() >= capacity;
        }

        /// Retorna a capacidade total.
        pub fn getCapacity(_: *const Self) usize {
            return capacity;
        }
    };
}

Decisão de design: Usamos acquire/release memory ordering. O producer usa release ao atualizar write_pos (garante que os dados já foram escritos). O consumer usa acquire ao ler write_pos (garante que vê os dados escritos). Isso é mais eficiente que seq_cst.

Passo 3: Ring Buffer com Batch Operations

/// Extensão com operações em batch para melhor throughput.
pub fn BatchRingBuffer(comptime T: type, comptime capacity: usize) type {
    const Base = RingBuffer(T, capacity);

    return struct {
        inner: Base,

        const Self = @This();

        pub fn init() Self {
            return .{ .inner = Base.init() };
        }

        /// Escreve múltiplos itens de uma vez. Retorna quantos foram escritos.
        pub fn pushBatch(self: *Self, items: []const T) usize {
            var escritos: usize = 0;
            for (items) |item| {
                if (!self.inner.tryPush(item)) break;
                escritos += 1;
            }
            return escritos;
        }

        /// Lê múltiplos itens de uma vez. Retorna quantos foram lidos.
        pub fn popBatch(self: *Self, out: []T) usize {
            var lidos: usize = 0;
            for (out) |*slot| {
                if (self.inner.tryPop()) |item| {
                    slot.* = item;
                    lidos += 1;
                } else break;
            }
            return lidos;
        }

        pub fn tryPush(self: *Self, item: T) bool {
            return self.inner.tryPush(item);
        }

        pub fn tryPop(self: *Self) ?T {
            return self.inner.tryPop();
        }

        pub fn len(self: *const Self) usize {
            return self.inner.len();
        }
    };
}

Passo 4: Demonstração Producer-Consumer

const Mensagem = struct {
    id: u64,
    valor: i32,
    timestamp: i64,
};

const BUFFER_SIZE = 1024; // deve ser potência de 2

var ring = RingBuffer(Mensagem, BUFFER_SIZE).init();
var producer_done = atomic.Value(bool).init(false);

fn producerThread() void {
    const NUM_MENSAGENS = 100_000;

    for (0..NUM_MENSAGENS) |i| {
        const msg = Mensagem{
            .id = i,
            .valor = @intCast(i * 7 % 1000),
            .timestamp = time.nanoTimestamp(),
        };
        ring.push(msg);
    }

    producer_done.store(true, .release);
}

fn consumerThread() void {
    var recebidas: u64 = 0;
    var soma_valores: i64 = 0;
    var latencia_total: i64 = 0;

    while (true) {
        if (ring.tryPop()) |msg| {
            recebidas += 1;
            soma_valores += msg.valor;
            latencia_total += time.nanoTimestamp() - msg.timestamp;
        } else if (producer_done.load(.acquire)) {
            // Drenar o que resta
            while (ring.tryPop()) |msg| {
                recebidas += 1;
                soma_valores += msg.valor;
                latencia_total += time.nanoTimestamp() - msg.timestamp;
            }
            break;
        } else {
            std.atomic.spinLoopHint();
        }
    }

    // Imprimir resultados do consumer
    const stdout = io.getStdOut().writer();
    stdout.print("  Mensagens recebidas:   {d}\n", .{recebidas}) catch {};
    stdout.print("  Soma dos valores:      {d}\n", .{soma_valores}) catch {};
    if (recebidas > 0) {
        const lat_media = @divTrunc(latencia_total, @as(i64, @intCast(recebidas)));
        stdout.print("  Latencia media:        {d}ns\n", .{lat_media}) catch {};
    }
}

pub fn main() !void {
    const stdout = io.getStdOut().writer();

    try stdout.print(
        \\
        \\  ==========================================
        \\     RING BUFFER - Lock-Free - Zig
        \\  ==========================================
        \\  Capacidade: {d} mensagens
        \\  Tipo: SPSC (Single Producer, Single Consumer)
        \\  Sincronizacao: Atomic operations (sem mutex)
        \\  ==========================================
        \\
        \\
    , .{BUFFER_SIZE});

    // Demonstração single-threaded
    try stdout.print("  --- Teste Single-Thread ---\n", .{});
    var rb = RingBuffer(u32, 16).init();

    for (0..10) |i| {
        _ = rb.tryPush(@intCast(i * 10));
    }
    try stdout.print("  Inseridos 10 itens. Tamanho: {d}\n", .{rb.len()});

    var soma: u32 = 0;
    while (rb.tryPop()) |val| {
        soma += val;
    }
    try stdout.print("  Soma apos drenar: {d}\n", .{soma});
    try stdout.print("  Vazio: {}\n\n", .{rb.isEmpty()});

    // Demonstração multi-threaded
    try stdout.print("  --- Teste Producer-Consumer (100k mensagens) ---\n", .{});

    const inicio = time.nanoTimestamp();

    const producer = try Thread.spawn(.{}, producerThread, .{});
    const consumer = try Thread.spawn(.{}, consumerThread, .{});

    producer.join();
    consumer.join();

    const duracao: u64 = @intCast(time.nanoTimestamp() - inicio);
    try stdout.print("  Tempo total:           {d:.2}ms\n", .{
        @as(f64, @floatFromInt(duracao)) / 1_000_000.0,
    });
    try stdout.print("  Throughput:            {d:.0} msg/seg\n", .{
        100_000.0 / (@as(f64, @floatFromInt(duracao)) / 1_000_000_000.0),
    });
}

Testes

test "ring buffer - push e pop basico" {
    var rb = RingBuffer(u32, 4).init();

    try std.testing.expect(rb.tryPush(1));
    try std.testing.expect(rb.tryPush(2));
    try std.testing.expect(rb.tryPush(3));
    try std.testing.expect(rb.tryPush(4));
    try std.testing.expect(!rb.tryPush(5)); // cheio

    try std.testing.expectEqual(@as(u32, 1), rb.tryPop().?);
    try std.testing.expectEqual(@as(u32, 2), rb.tryPop().?);
}

test "ring buffer - vazio retorna null" {
    var rb = RingBuffer(u32, 4).init();
    try std.testing.expect(rb.tryPop() == null);
    try std.testing.expect(rb.isEmpty());
}

test "ring buffer - cheio e esvaziar" {
    var rb = RingBuffer(u8, 8).init();

    for (0..8) |i| {
        try std.testing.expect(rb.tryPush(@intCast(i)));
    }
    try std.testing.expect(rb.isFull());
    try std.testing.expect(!rb.tryPush(99));

    for (0..8) |i| {
        try std.testing.expectEqual(@as(u8, @intCast(i)), rb.tryPop().?);
    }
    try std.testing.expect(rb.isEmpty());
}

test "ring buffer - wraparound" {
    var rb = RingBuffer(u32, 4).init();

    // Preencher e esvaziar várias vezes para testar wraparound
    for (0..3) |_| {
        for (0..4) |j| {
            try std.testing.expect(rb.tryPush(@intCast(j)));
        }
        for (0..4) |j| {
            try std.testing.expectEqual(@as(u32, @intCast(j)), rb.tryPop().?);
        }
    }
}

test "batch ring buffer" {
    var brb = BatchRingBuffer(u32, 16).init();

    const items = [_]u32{ 1, 2, 3, 4, 5 };
    const escritos = brb.pushBatch(&items);
    try std.testing.expectEqual(@as(usize, 5), escritos);

    var out: [10]u32 = undefined;
    const lidos = brb.popBatch(&out);
    try std.testing.expectEqual(@as(usize, 5), lidos);
    try std.testing.expectEqual(@as(u32, 1), out[0]);
}

Compilando e Executando

zig build run
zig build test

Conceitos Aprendidos

  • Ring buffer (buffer circular) com potência de 2 e máscara de bits
  • Lock-free programming com operações atômicas
  • Memory ordering (acquire/release) para sincronização
  • SPSC pattern (Single Producer, Single Consumer)
  • Comptime para validação de capacidade em tempo de compilação

Próximos Passos

Continue aprendendo Zig

Explore mais tutoriais e artigos em português para dominar a linguagem Zig.