Ring Buffer Lock-Free em Zig — Tutorial Passo a Passo
Neste tutorial, vamos construir um ring buffer (buffer circular) lock-free em Zig usando operações atômicas. Este é o padrão de comunicação mais eficiente entre threads producer-consumer: sem mutex, sem bloqueio, apenas leituras e escritas atômicas.
O Que Vamos Construir
Nosso ring buffer vai:
- Implementar SPSC (Single Producer, Single Consumer) lock-free
- Usar operações atômicas para sincronização sem mutex
- Capacidade configurável em tempo de compilação via comptime
- Suportar leitura e escrita de tipos genéricos
- Incluir métricas de ocupação e throughput
- Demonstrar comunicação entre threads producer e consumer
Por Que Este Projeto?
Ring buffers lock-free são usados em kernels de sistema operacional, drivers de áudio, engines de jogos e sistemas de trading. Eles são a estrutura de comunicação inter-thread mais rápida possível porque evitam completamente locks. Em Zig, temos acesso direto a operações atômicas.
Pré-requisitos
- Zig 0.13+ instalado (guia de instalação)
- Conhecimento de threads
- Entendimento básico de atomics e memory ordering
Passo 1: Estrutura do Projeto
mkdir ring-buffer
cd ring-buffer
zig init
Passo 2: O Ring Buffer Lock-Free
const std = @import("std");
const Thread = std.Thread;
const io = std.io;
const mem = std.mem;
const time = std.time;
const atomic = std.atomic;
/// Ring Buffer Lock-Free para comunicação SPSC
/// (Single Producer, Single Consumer).
///
/// O truque: usamos dois índices atômicos (write_pos e read_pos).
/// O producer só atualiza write_pos, o consumer só atualiza read_pos.
/// Não é necessário nenhum lock!
///
/// Capacidade deve ser potência de 2 para otimizar com máscara de bits.
pub fn RingBuffer(comptime T: type, comptime capacity: usize) type {
// Verificar que capacity é potência de 2
if (capacity == 0 or (capacity & (capacity - 1)) != 0) {
@compileError("Capacidade deve ser potencia de 2");
}
return struct {
buffer: [capacity]T,
write_pos: atomic.Value(usize),
read_pos: atomic.Value(usize),
const Self = @This();
const MASK = capacity - 1;
pub fn init() Self {
return .{
.buffer = undefined,
.write_pos = atomic.Value(usize).init(0),
.read_pos = atomic.Value(usize).init(0),
};
}
/// Tenta escrever um item. Retorna false se cheio (não bloqueia).
pub fn tryPush(self: *Self, item: T) bool {
const wp = self.write_pos.load(.acquire);
const rp = self.read_pos.load(.acquire);
// Cheio quando write está uma volta inteira à frente do read
if (wp - rp >= capacity) return false;
self.buffer[wp & MASK] = item;
// Memory barrier: garantir que os dados são escritos
// antes de atualizar write_pos
self.write_pos.store(wp + 1, .release);
return true;
}
/// Tenta ler um item. Retorna null se vazio (não bloqueia).
pub fn tryPop(self: *Self) ?T {
const rp = self.read_pos.load(.acquire);
const wp = self.write_pos.load(.acquire);
// Vazio quando read alcançou write
if (rp >= wp) return null;
const item = self.buffer[rp & MASK];
// Memory barrier: garantir que os dados são lidos
// antes de atualizar read_pos
self.read_pos.store(rp + 1, .release);
return item;
}
/// Escreve um item, esperando se necessário (bloqueante).
pub fn push(self: *Self, item: T) void {
while (!self.tryPush(item)) {
// Spin wait com hint para o processador
std.atomic.spinLoopHint();
}
}
/// Lê um item, esperando se necessário (bloqueante).
pub fn pop(self: *Self) T {
while (true) {
if (self.tryPop()) |item| return item;
std.atomic.spinLoopHint();
}
}
/// Retorna quantos itens estão no buffer.
pub fn len(self: *const Self) usize {
const wp = self.write_pos.load(.acquire);
const rp = self.read_pos.load(.acquire);
return wp - rp;
}
/// Retorna true se o buffer está vazio.
pub fn isEmpty(self: *const Self) bool {
return self.len() == 0;
}
/// Retorna true se o buffer está cheio.
pub fn isFull(self: *const Self) bool {
return self.len() >= capacity;
}
/// Retorna a capacidade total.
pub fn getCapacity(_: *const Self) usize {
return capacity;
}
};
}
Decisão de design: Usamos acquire/release memory ordering. O producer usa release ao atualizar write_pos (garante que os dados já foram escritos). O consumer usa acquire ao ler write_pos (garante que vê os dados escritos). Isso é mais eficiente que seq_cst.
Passo 3: Ring Buffer com Batch Operations
/// Extensão com operações em batch para melhor throughput.
pub fn BatchRingBuffer(comptime T: type, comptime capacity: usize) type {
const Base = RingBuffer(T, capacity);
return struct {
inner: Base,
const Self = @This();
pub fn init() Self {
return .{ .inner = Base.init() };
}
/// Escreve múltiplos itens de uma vez. Retorna quantos foram escritos.
pub fn pushBatch(self: *Self, items: []const T) usize {
var escritos: usize = 0;
for (items) |item| {
if (!self.inner.tryPush(item)) break;
escritos += 1;
}
return escritos;
}
/// Lê múltiplos itens de uma vez. Retorna quantos foram lidos.
pub fn popBatch(self: *Self, out: []T) usize {
var lidos: usize = 0;
for (out) |*slot| {
if (self.inner.tryPop()) |item| {
slot.* = item;
lidos += 1;
} else break;
}
return lidos;
}
pub fn tryPush(self: *Self, item: T) bool {
return self.inner.tryPush(item);
}
pub fn tryPop(self: *Self) ?T {
return self.inner.tryPop();
}
pub fn len(self: *const Self) usize {
return self.inner.len();
}
};
}
Passo 4: Demonstração Producer-Consumer
const Mensagem = struct {
id: u64,
valor: i32,
timestamp: i64,
};
const BUFFER_SIZE = 1024; // deve ser potência de 2
var ring = RingBuffer(Mensagem, BUFFER_SIZE).init();
var producer_done = atomic.Value(bool).init(false);
fn producerThread() void {
const NUM_MENSAGENS = 100_000;
for (0..NUM_MENSAGENS) |i| {
const msg = Mensagem{
.id = i,
.valor = @intCast(i * 7 % 1000),
.timestamp = time.nanoTimestamp(),
};
ring.push(msg);
}
producer_done.store(true, .release);
}
fn consumerThread() void {
var recebidas: u64 = 0;
var soma_valores: i64 = 0;
var latencia_total: i64 = 0;
while (true) {
if (ring.tryPop()) |msg| {
recebidas += 1;
soma_valores += msg.valor;
latencia_total += time.nanoTimestamp() - msg.timestamp;
} else if (producer_done.load(.acquire)) {
// Drenar o que resta
while (ring.tryPop()) |msg| {
recebidas += 1;
soma_valores += msg.valor;
latencia_total += time.nanoTimestamp() - msg.timestamp;
}
break;
} else {
std.atomic.spinLoopHint();
}
}
// Imprimir resultados do consumer
const stdout = io.getStdOut().writer();
stdout.print(" Mensagens recebidas: {d}\n", .{recebidas}) catch {};
stdout.print(" Soma dos valores: {d}\n", .{soma_valores}) catch {};
if (recebidas > 0) {
const lat_media = @divTrunc(latencia_total, @as(i64, @intCast(recebidas)));
stdout.print(" Latencia media: {d}ns\n", .{lat_media}) catch {};
}
}
pub fn main() !void {
const stdout = io.getStdOut().writer();
try stdout.print(
\\
\\ ==========================================
\\ RING BUFFER - Lock-Free - Zig
\\ ==========================================
\\ Capacidade: {d} mensagens
\\ Tipo: SPSC (Single Producer, Single Consumer)
\\ Sincronizacao: Atomic operations (sem mutex)
\\ ==========================================
\\
\\
, .{BUFFER_SIZE});
// Demonstração single-threaded
try stdout.print(" --- Teste Single-Thread ---\n", .{});
var rb = RingBuffer(u32, 16).init();
for (0..10) |i| {
_ = rb.tryPush(@intCast(i * 10));
}
try stdout.print(" Inseridos 10 itens. Tamanho: {d}\n", .{rb.len()});
var soma: u32 = 0;
while (rb.tryPop()) |val| {
soma += val;
}
try stdout.print(" Soma apos drenar: {d}\n", .{soma});
try stdout.print(" Vazio: {}\n\n", .{rb.isEmpty()});
// Demonstração multi-threaded
try stdout.print(" --- Teste Producer-Consumer (100k mensagens) ---\n", .{});
const inicio = time.nanoTimestamp();
const producer = try Thread.spawn(.{}, producerThread, .{});
const consumer = try Thread.spawn(.{}, consumerThread, .{});
producer.join();
consumer.join();
const duracao: u64 = @intCast(time.nanoTimestamp() - inicio);
try stdout.print(" Tempo total: {d:.2}ms\n", .{
@as(f64, @floatFromInt(duracao)) / 1_000_000.0,
});
try stdout.print(" Throughput: {d:.0} msg/seg\n", .{
100_000.0 / (@as(f64, @floatFromInt(duracao)) / 1_000_000_000.0),
});
}
Testes
test "ring buffer - push e pop basico" {
var rb = RingBuffer(u32, 4).init();
try std.testing.expect(rb.tryPush(1));
try std.testing.expect(rb.tryPush(2));
try std.testing.expect(rb.tryPush(3));
try std.testing.expect(rb.tryPush(4));
try std.testing.expect(!rb.tryPush(5)); // cheio
try std.testing.expectEqual(@as(u32, 1), rb.tryPop().?);
try std.testing.expectEqual(@as(u32, 2), rb.tryPop().?);
}
test "ring buffer - vazio retorna null" {
var rb = RingBuffer(u32, 4).init();
try std.testing.expect(rb.tryPop() == null);
try std.testing.expect(rb.isEmpty());
}
test "ring buffer - cheio e esvaziar" {
var rb = RingBuffer(u8, 8).init();
for (0..8) |i| {
try std.testing.expect(rb.tryPush(@intCast(i)));
}
try std.testing.expect(rb.isFull());
try std.testing.expect(!rb.tryPush(99));
for (0..8) |i| {
try std.testing.expectEqual(@as(u8, @intCast(i)), rb.tryPop().?);
}
try std.testing.expect(rb.isEmpty());
}
test "ring buffer - wraparound" {
var rb = RingBuffer(u32, 4).init();
// Preencher e esvaziar várias vezes para testar wraparound
for (0..3) |_| {
for (0..4) |j| {
try std.testing.expect(rb.tryPush(@intCast(j)));
}
for (0..4) |j| {
try std.testing.expectEqual(@as(u32, @intCast(j)), rb.tryPop().?);
}
}
}
test "batch ring buffer" {
var brb = BatchRingBuffer(u32, 16).init();
const items = [_]u32{ 1, 2, 3, 4, 5 };
const escritos = brb.pushBatch(&items);
try std.testing.expectEqual(@as(usize, 5), escritos);
var out: [10]u32 = undefined;
const lidos = brb.popBatch(&out);
try std.testing.expectEqual(@as(usize, 5), lidos);
try std.testing.expectEqual(@as(u32, 1), out[0]);
}
Compilando e Executando
zig build run
zig build test
Conceitos Aprendidos
- Ring buffer (buffer circular) com potência de 2 e máscara de bits
- Lock-free programming com operações atômicas
- Memory ordering (acquire/release) para sincronização
- SPSC pattern (Single Producer, Single Consumer)
- Comptime para validação de capacidade em tempo de compilação
Próximos Passos
- Explore threads e atomics na stdlib
- Veja a Thread Pool para mais concorrência
- Construa o Allocator Customizado para outra estrutura
- Consulte generics com comptime na documentação