Cheatsheet: Producer-Consumer em Zig

Producer-Consumer em Zig

O padrão Producer-Consumer desacopla a produção de dados do consumo, usando um buffer intermediário para equilibrar diferenças de velocidade entre produtor e consumidor. Em Zig, com primitivas de concorrência explícitas, esse padrão é construído com mutexes, condition variables e buffers pré-alocados.

Quando Usar

  • Processamento assíncrono de tarefas (job queues)
  • Ingestão de dados com processamento posterior
  • Pipeline de mídia (captura, processamento, renderização)
  • Log aggregation e processamento de eventos
  • Desacoplar I/O de processamento

Fila Thread-Safe com Buffer Circular

const std = @import("std");

fn FilaSegura(comptime T: type, comptime capacidade: usize) type {
    return struct {
        const Self = @This();

        buffer: [capacidade]T = undefined,
        inicio: usize = 0,
        tamanho: usize = 0,
        mutex: std.Thread.Mutex = .{},
        nao_vazio: std.Thread.Condition = .{},
        nao_cheio: std.Thread.Condition = .{},

        pub fn init() Self {
            return .{};
        }

        pub fn inserir(self: *Self, item: T) void {
            self.mutex.lock();
            defer self.mutex.unlock();

            while (self.tamanho >= capacidade) {
                self.nao_cheio.wait(&self.mutex);
            }

            const pos = (self.inicio + self.tamanho) % capacidade;
            self.buffer[pos] = item;
            self.tamanho += 1;

            self.nao_vazio.signal();
        }

        pub fn remover(self: *Self) T {
            self.mutex.lock();
            defer self.mutex.unlock();

            while (self.tamanho == 0) {
                self.nao_vazio.wait(&self.mutex);
            }

            const item = self.buffer[self.inicio];
            self.inicio = (self.inicio + 1) % capacidade;
            self.tamanho -= 1;

            self.nao_cheio.signal();
            return item;
        }
    };
}

const Tarefa = struct {
    id: u32,
    dados: [64]u8 = undefined,
};

pub fn main() !void {
    var fila = FilaSegura(Tarefa, 32).init();

    // Produtor
    const producer = try std.Thread.spawn(.{}, struct {
        fn run(f: *@TypeOf(fila)) void {
            for (0..100) |i| {
                f.inserir(.{ .id = @intCast(i) });
            }
        }
    }.run, .{&fila});

    // Consumidor
    const consumer = try std.Thread.spawn(.{}, struct {
        fn run(f: *@TypeOf(fila)) void {
            for (0..100) |_| {
                const tarefa = f.remover();
                std.debug.print("Processando tarefa {d}\n", .{tarefa.id});
            }
        }
    }.run, .{&fila});

    producer.join();
    consumer.join();
}

Múltiplos Produtores e Consumidores

const std = @import("std");

const NUM_PRODUTORES = 3;
const NUM_CONSUMIDORES = 2;
const ITENS_POR_PRODUTOR = 50;

pub fn main() !void {
    const Fila = FilaSegura(i32, 64);
    var fila = Fila.init();

    var produtores: [NUM_PRODUTORES]std.Thread = undefined;
    var consumidores: [NUM_CONSUMIDORES]std.Thread = undefined;

    // Iniciar produtores
    for (0..NUM_PRODUTORES) |i| {
        produtores[i] = try std.Thread.spawn(.{}, struct {
            fn run(f: *Fila, id: usize) void {
                for (0..ITENS_POR_PRODUTOR) |j| {
                    const valor: i32 = @intCast(id * 1000 + j);
                    f.inserir(valor);
                }
            }
        }.run, .{ &fila, i });
    }

    // Iniciar consumidores
    for (0..NUM_CONSUMIDORES) |i| {
        consumidores[i] = try std.Thread.spawn(.{}, struct {
            fn run(f: *Fila, id: usize) void {
                const total = (NUM_PRODUTORES * ITENS_POR_PRODUTOR) / NUM_CONSUMIDORES;
                for (0..total) |_| {
                    const val = f.remover();
                    _ = val;
                    _ = id;
                }
            }
        }.run, .{ &fila, i });
    }

    for (produtores) |t| t.join();
    for (consumidores) |t| t.join();

    std.debug.print("Todas as tarefas processadas!\n", .{});
}

Considerações de Performance

  • Buffer circular vs ArrayList: o buffer circular com tamanho fixo (como no exemplo) é ideal para Producer-Consumer — operações de inserção e remoção são O(1) sem realocação. Um ArrayList realocaria o buffer interno ao crescer, o que é inaceitável com mutex tomado.
  • Condição de espera vs spin wait: usar std.Thread.Condition.wait suspende a thread quando a fila está vazia/cheia, economizando CPU. Spin wait (loop com std.atomic.spinLoopHint) é mais rápido para latências muito baixas, mas desperdiça CPU quando a espera é longa.
  • Batch de itens: em vez de inserir um item por vez, considere inserir/remover lotes. Isso reduz o número de lock/unlock do mutex e melhora o throughput. O produtor pode acumular N itens e inserir todos de uma vez sob um único lock.
  • Uso de atomics para contador de tamanho: se você só precisa checar se a fila está vazia antes de tentar remover, um std.atomic.Value(usize) para o tamanho pode evitar o lock em reads rápidas.

Padrão de Shutdown Graceful

Um problema comum é como sinalizar ao consumidor que a produção terminou:

const FilaComShutdown = struct {
    fila: FilaSegura(i32, 64),
    encerrado: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),

    pub fn encerrar(self: *@This()) void {
        self.encerrado.store(true, .release);
        // Acorda consumidores que possam estar esperando
        self.fila.nao_vazio.broadcast();
    }

    pub fn removerOuNull(self: *@This()) ?i32 {
        self.fila.mutex.lock();
        defer self.fila.mutex.unlock();

        while (self.fila.tamanho == 0) {
            if (self.encerrado.load(.acquire)) return null;
            self.fila.nao_vazio.wait(&self.fila.mutex);
        }
        return self.fila.removerInterno();
    }
};

Erros Comuns

Esquecer de chamar broadcast ao encerrar: se o consumidor está bloqueado em condition.wait e o produtor termina sem sinalizar, o consumidor fica preso indefinidamente. Ao encerrar a produção, chame nao_vazio.broadcast() para acordar todos os consumidores.

Mutex tomado por muito tempo: não execute operações lentas (I/O, processamento) com o mutex tomado. O mutex deve proteger apenas a manipulação dos ponteiros de inicio/tamanho do buffer. Copie o item para fora da fila, libere o mutex, e então processe o item.

Tamanho de buffer como potência de dois: ao usar % para calcular a posição no buffer circular, o compilador pode otimizar para um bitwise & apenas quando a capacidade é potência de dois. Prefira capacidades como 32, 64, 128 para melhor performance.

Perguntas Frequentes

Quantas threads de consumidor devo usar? Regra prática: para tarefas CPU-bound, use std.Thread.getCpuCount() - 1 threads. Para tarefas I/O-bound (espera em rede, disco), você pode usar mais threads que CPUs, pois elas passam a maior parte do tempo bloqueadas esperando I/O.

Qual é a diferença entre Producer-Consumer e Pipeline? No Pipeline, cada estágio processa um item e passa para o próximo estágio — o fluxo é linear e síncrono dentro de cada thread. No Producer-Consumer, produtor e consumidor rodam em threads separadas simultaneamente, com uma fila desacoplando suas velocidades.

Como evitar que o produtor sobrecarregue a fila? A condition.wait em nao_cheio faz exatamente isso — o produtor bloqueia quando a fila está cheia. Se quiser comportamento não-bloqueante, implemente um inserirOuFalhar que retorna error.FilaCheia em vez de bloquear.

Quando Evitar

  • Processamento síncrono simples onde o buffer não ajuda
  • Quando produtor e consumidor têm a mesma velocidade
  • Sistemas single-threaded sem necessidade de desacoplamento

Veja Também

Continue aprendendo Zig

Explore mais tutoriais e artigos em português para dominar a linguagem Zig.