Producer-Consumer em Zig
O padrão Producer-Consumer desacopla a produção de dados do consumo, usando um buffer intermediário para equilibrar diferenças de velocidade entre produtor e consumidor. Em Zig, com primitivas de concorrência explícitas, esse padrão é construído com mutexes, condition variables e buffers pré-alocados.
Quando Usar
- Processamento assíncrono de tarefas (job queues)
- Ingestão de dados com processamento posterior
- Pipeline de mídia (captura, processamento, renderização)
- Log aggregation e processamento de eventos
- Desacoplar I/O de processamento
Fila Thread-Safe com Buffer Circular
const std = @import("std");
fn FilaSegura(comptime T: type, comptime capacidade: usize) type {
return struct {
const Self = @This();
buffer: [capacidade]T = undefined,
inicio: usize = 0,
tamanho: usize = 0,
mutex: std.Thread.Mutex = .{},
nao_vazio: std.Thread.Condition = .{},
nao_cheio: std.Thread.Condition = .{},
pub fn init() Self {
return .{};
}
pub fn inserir(self: *Self, item: T) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.tamanho >= capacidade) {
self.nao_cheio.wait(&self.mutex);
}
const pos = (self.inicio + self.tamanho) % capacidade;
self.buffer[pos] = item;
self.tamanho += 1;
self.nao_vazio.signal();
}
pub fn remover(self: *Self) T {
self.mutex.lock();
defer self.mutex.unlock();
while (self.tamanho == 0) {
self.nao_vazio.wait(&self.mutex);
}
const item = self.buffer[self.inicio];
self.inicio = (self.inicio + 1) % capacidade;
self.tamanho -= 1;
self.nao_cheio.signal();
return item;
}
};
}
const Tarefa = struct {
id: u32,
dados: [64]u8 = undefined,
};
pub fn main() !void {
var fila = FilaSegura(Tarefa, 32).init();
// Produtor
const producer = try std.Thread.spawn(.{}, struct {
fn run(f: *@TypeOf(fila)) void {
for (0..100) |i| {
f.inserir(.{ .id = @intCast(i) });
}
}
}.run, .{&fila});
// Consumidor
const consumer = try std.Thread.spawn(.{}, struct {
fn run(f: *@TypeOf(fila)) void {
for (0..100) |_| {
const tarefa = f.remover();
std.debug.print("Processando tarefa {d}\n", .{tarefa.id});
}
}
}.run, .{&fila});
producer.join();
consumer.join();
}
Múltiplos Produtores e Consumidores
const std = @import("std");
const NUM_PRODUTORES = 3;
const NUM_CONSUMIDORES = 2;
const ITENS_POR_PRODUTOR = 50;
pub fn main() !void {
const Fila = FilaSegura(i32, 64);
var fila = Fila.init();
var produtores: [NUM_PRODUTORES]std.Thread = undefined;
var consumidores: [NUM_CONSUMIDORES]std.Thread = undefined;
// Iniciar produtores
for (0..NUM_PRODUTORES) |i| {
produtores[i] = try std.Thread.spawn(.{}, struct {
fn run(f: *Fila, id: usize) void {
for (0..ITENS_POR_PRODUTOR) |j| {
const valor: i32 = @intCast(id * 1000 + j);
f.inserir(valor);
}
}
}.run, .{ &fila, i });
}
// Iniciar consumidores
for (0..NUM_CONSUMIDORES) |i| {
consumidores[i] = try std.Thread.spawn(.{}, struct {
fn run(f: *Fila, id: usize) void {
const total = (NUM_PRODUTORES * ITENS_POR_PRODUTOR) / NUM_CONSUMIDORES;
for (0..total) |_| {
const val = f.remover();
_ = val;
_ = id;
}
}
}.run, .{ &fila, i });
}
for (produtores) |t| t.join();
for (consumidores) |t| t.join();
std.debug.print("Todas as tarefas processadas!\n", .{});
}
Considerações de Performance
- Buffer circular vs
ArrayList: o buffer circular com tamanho fixo (como no exemplo) é ideal para Producer-Consumer — operações de inserção e remoção são O(1) sem realocação. UmArrayListrealocaria o buffer interno ao crescer, o que é inaceitável com mutex tomado. - Condição de espera vs spin wait: usar
std.Thread.Condition.waitsuspende a thread quando a fila está vazia/cheia, economizando CPU. Spin wait (loop comstd.atomic.spinLoopHint) é mais rápido para latências muito baixas, mas desperdiça CPU quando a espera é longa. - Batch de itens: em vez de inserir um item por vez, considere inserir/remover lotes. Isso reduz o número de lock/unlock do mutex e melhora o throughput. O produtor pode acumular N itens e inserir todos de uma vez sob um único lock.
- Uso de atomics para contador de tamanho: se você só precisa checar se a fila está vazia antes de tentar remover, um
std.atomic.Value(usize)para o tamanho pode evitar o lock em reads rápidas.
Padrão de Shutdown Graceful
Um problema comum é como sinalizar ao consumidor que a produção terminou:
const FilaComShutdown = struct {
fila: FilaSegura(i32, 64),
encerrado: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
pub fn encerrar(self: *@This()) void {
self.encerrado.store(true, .release);
// Acorda consumidores que possam estar esperando
self.fila.nao_vazio.broadcast();
}
pub fn removerOuNull(self: *@This()) ?i32 {
self.fila.mutex.lock();
defer self.fila.mutex.unlock();
while (self.fila.tamanho == 0) {
if (self.encerrado.load(.acquire)) return null;
self.fila.nao_vazio.wait(&self.fila.mutex);
}
return self.fila.removerInterno();
}
};
Erros Comuns
Esquecer de chamar broadcast ao encerrar: se o consumidor está bloqueado em condition.wait e o produtor termina sem sinalizar, o consumidor fica preso indefinidamente. Ao encerrar a produção, chame nao_vazio.broadcast() para acordar todos os consumidores.
Mutex tomado por muito tempo: não execute operações lentas (I/O, processamento) com o mutex tomado. O mutex deve proteger apenas a manipulação dos ponteiros de inicio/tamanho do buffer. Copie o item para fora da fila, libere o mutex, e então processe o item.
Tamanho de buffer como potência de dois: ao usar % para calcular a posição no buffer circular, o compilador pode otimizar para um bitwise & apenas quando a capacidade é potência de dois. Prefira capacidades como 32, 64, 128 para melhor performance.
Perguntas Frequentes
Quantas threads de consumidor devo usar?
Regra prática: para tarefas CPU-bound, use std.Thread.getCpuCount() - 1 threads. Para tarefas I/O-bound (espera em rede, disco), você pode usar mais threads que CPUs, pois elas passam a maior parte do tempo bloqueadas esperando I/O.
Qual é a diferença entre Producer-Consumer e Pipeline? No Pipeline, cada estágio processa um item e passa para o próximo estágio — o fluxo é linear e síncrono dentro de cada thread. No Producer-Consumer, produtor e consumidor rodam em threads separadas simultaneamente, com uma fila desacoplando suas velocidades.
Como evitar que o produtor sobrecarregue a fila?
A condition.wait em nao_cheio faz exatamente isso — o produtor bloqueia quando a fila está cheia. Se quiser comportamento não-bloqueante, implemente um inserirOuFalhar que retorna error.FilaCheia em vez de bloquear.
Quando Evitar
- Processamento síncrono simples onde o buffer não ajuda
- Quando produtor e consumidor têm a mesma velocidade
- Sistemas single-threaded sem necessidade de desacoplamento
Veja Também
- Concorrência — Threads e sincronização
- Command — Encapsular tarefas como objetos
- Pipeline — Processamento em estágios
- Pool de Objetos — Reutilizar objetos da fila
- Circuit Breaker — Proteção contra sobrecarga