Producer-Consumer em Zig
O padrão Producer-Consumer desacopla a produção de dados do consumo, usando um buffer intermediário para equilibrar diferenças de velocidade entre produtor e consumidor. Em Zig, com primitivas de concorrência explícitas, esse padrão é construído com mutexes, condition variables e buffers pré-alocados.
Quando Usar
- Processamento assíncrono de tarefas (job queues)
- Ingestão de dados com processamento posterior
- Pipeline de mídia (captura, processamento, renderização)
- Log aggregation e processamento de eventos
- Desacoplar I/O de processamento
Fila Thread-Safe com Buffer Circular
const std = @import("std");
fn FilaSegura(comptime T: type, comptime capacidade: usize) type {
return struct {
const Self = @This();
buffer: [capacidade]T = undefined,
inicio: usize = 0,
tamanho: usize = 0,
mutex: std.Thread.Mutex = .{},
nao_vazio: std.Thread.Condition = .{},
nao_cheio: std.Thread.Condition = .{},
pub fn init() Self {
return .{};
}
pub fn inserir(self: *Self, item: T) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.tamanho >= capacidade) {
self.nao_cheio.wait(&self.mutex);
}
const pos = (self.inicio + self.tamanho) % capacidade;
self.buffer[pos] = item;
self.tamanho += 1;
self.nao_vazio.signal();
}
pub fn remover(self: *Self) T {
self.mutex.lock();
defer self.mutex.unlock();
while (self.tamanho == 0) {
self.nao_vazio.wait(&self.mutex);
}
const item = self.buffer[self.inicio];
self.inicio = (self.inicio + 1) % capacidade;
self.tamanho -= 1;
self.nao_cheio.signal();
return item;
}
};
}
const Tarefa = struct {
id: u32,
dados: [64]u8 = undefined,
};
pub fn main() !void {
var fila = FilaSegura(Tarefa, 32).init();
// Produtor
const producer = try std.Thread.spawn(.{}, struct {
fn run(f: *@TypeOf(fila)) void {
for (0..100) |i| {
f.inserir(.{ .id = @intCast(i) });
}
}
}.run, .{&fila});
// Consumidor
const consumer = try std.Thread.spawn(.{}, struct {
fn run(f: *@TypeOf(fila)) void {
for (0..100) |_| {
const tarefa = f.remover();
std.debug.print("Processando tarefa {d}\n", .{tarefa.id});
}
}
}.run, .{&fila});
producer.join();
consumer.join();
}
Múltiplos Produtores e Consumidores
const std = @import("std");
const NUM_PRODUTORES = 3;
const NUM_CONSUMIDORES = 2;
const ITENS_POR_PRODUTOR = 50;
pub fn main() !void {
const Fila = FilaSegura(i32, 64);
var fila = Fila.init();
var produtores: [NUM_PRODUTORES]std.Thread = undefined;
var consumidores: [NUM_CONSUMIDORES]std.Thread = undefined;
// Iniciar produtores
for (0..NUM_PRODUTORES) |i| {
produtores[i] = try std.Thread.spawn(.{}, struct {
fn run(f: *Fila, id: usize) void {
for (0..ITENS_POR_PRODUTOR) |j| {
const valor: i32 = @intCast(id * 1000 + j);
f.inserir(valor);
}
}
}.run, .{ &fila, i });
}
// Iniciar consumidores
for (0..NUM_CONSUMIDORES) |i| {
consumidores[i] = try std.Thread.spawn(.{}, struct {
fn run(f: *Fila, id: usize) void {
const total = (NUM_PRODUTORES * ITENS_POR_PRODUTOR) / NUM_CONSUMIDORES;
for (0..total) |_| {
const val = f.remover();
_ = val;
_ = id;
}
}
}.run, .{ &fila, i });
}
for (produtores) |t| t.join();
for (consumidores) |t| t.join();
std.debug.print("Todas as tarefas processadas!\n", .{});
}
Quando Evitar
- Processamento síncrono simples onde o buffer não ajuda
- Quando produtor e consumidor têm a mesma velocidade
- Sistemas single-threaded sem necessidade de desacoplamento
Veja Também
- Concorrência — Threads e sincronização
- Command — Encapsular tarefas como objetos
- Pipeline — Processamento em estágios
- Pool de Objetos — Reutilizar objetos da fila
- Circuit Breaker — Proteção contra sobrecarga