Cheatsheet: Pipeline em Zig

Pipeline em Zig

O padrão Pipeline processa dados através de uma série de estágios sequenciais, onde a saída de cada estágio alimenta a entrada do próximo. Em Zig, pipelines são elegantemente construídos com composição de funções, writers/readers encadeados e comptime para pipelines estáticos de custo zero.

Quando Usar

  • Processamento de texto (ler, parsear, transformar, salvar)
  • Pipelines de compilação (lexer, parser, otimizador, code gen)
  • ETL (Extract, Transform, Load) de dados
  • Processamento de imagem em estágios
  • Middleware em servidores HTTP

Pipeline com Array de Funções

const std = @import("std");

const Estagio = *const fn ([]const u8) []const u8;

const TextPipeline = struct {
    estagios: std.ArrayList(Estagio),

    pub fn init(allocator: std.mem.Allocator) TextPipeline {
        return .{ .estagios = std.ArrayList(Estagio).init(allocator) };
    }

    pub fn deinit(self: *TextPipeline) void {
        self.estagios.deinit();
    }

    pub fn addEstagio(self: *TextPipeline, estagio: Estagio) !void {
        try self.estagios.append(estagio);
    }

    pub fn executar(self: *const TextPipeline, entrada: []const u8) []const u8 {
        var dados = entrada;
        for (self.estagios.items) |estagio| {
            dados = estagio(dados);
        }
        return dados;
    }
};

fn removerEspacos(texto: []const u8) []const u8 {
    return std.mem.trim(u8, texto, " \t");
}

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();

    var pipeline = TextPipeline.init(gpa.allocator());
    defer pipeline.deinit();

    try pipeline.addEstagio(removerEspacos);

    const resultado = pipeline.executar("  Olá Zig  ");
    std.debug.print("Resultado: '{s}'\n", .{resultado});
}

Pipeline comptime (Zero Overhead)

const std = @import("std");

fn pipeline(comptime estagios: anytype) fn (i32) i32 {
    return struct {
        fn executar(valor: i32) i32 {
            var resultado = valor;
            inline for (estagios) |estagio| {
                resultado = estagio(resultado);
            }
            return resultado;
        }
    }.executar;
}

fn dobrar(x: i32) i32 { return x * 2; }
fn somar10(x: i32) i32 { return x + 10; }
fn quadrado(x: i32) i32 { return x * x; }

const processar = pipeline(.{ dobrar, somar10, quadrado });

pub fn main() void {
    // (5 * 2 + 10)^2 = 400
    const resultado = processar(5);
    std.debug.print("Resultado: {d}\n", .{resultado}); // 400
}

Pipeline de I/O com Writers

const std = @import("std");

pub fn main() !void {
    // Pipeline: stdout <- buffered <- contagem
    const arquivo = try std.fs.cwd().createFile("/tmp/saida.txt", .{});
    defer arquivo.close();

    var buf_writer = std.io.bufferedWriter(arquivo.writer());
    const writer = buf_writer.writer();

    // Cada write passa por buffer antes de ir ao arquivo
    try writer.print("Linha 1: {s}\n", .{"dados"});
    try writer.print("Linha 2: {d}\n", .{42});
    try buf_writer.flush();
}

Pipeline com Error Propagation

Em pipelines de produção, cada estágio pode falhar. Zig’s error unions tornam isso explícito e seguro:

const std = @import("std");

const EstagioFalivel = *const fn ([]const u8, std.mem.Allocator) anyerror![]const u8;

const Pipeline = struct {
    estagios: std.ArrayList(EstagioFalivel),
    allocator: std.mem.Allocator,

    pub fn init(allocator: std.mem.Allocator) Pipeline {
        return .{
            .estagios = std.ArrayList(EstagioFalivel).init(allocator),
            .allocator = allocator,
        };
    }

    pub fn deinit(self: *Pipeline) void {
        self.estagios.deinit();
    }

    pub fn add(self: *Pipeline, estagio: EstagioFalivel) !void {
        try self.estagios.append(estagio);
    }

    pub fn executar(self: *const Pipeline, entrada: []const u8) ![]const u8 {
        var dados = entrada;
        for (self.estagios.items) |estagio| {
            // Se qualquer estágio falhar, o erro propaga imediatamente
            dados = try estagio(dados, self.allocator);
        }
        return dados;
    }
};

fn validarEntrada(texto: []const u8, _: std.mem.Allocator) ![]const u8 {
    if (texto.len == 0) return error.EntradaVazia;
    return texto;
}

fn normalizarTexto(texto: []const u8, allocator: std.mem.Allocator) ![]const u8 {
    return std.ascii.allocLowerString(allocator, texto);
}

Considerações de Performance

  • Pipeline comptime é custo zero: o inline for sobre os estágios comptime resulta em uma sequência de chamadas diretas sem overhead de loop ou ponteiro. O compilador pode inlinar cada estágio e otimizar o conjunto como uma única função.
  • Pipeline com ArrayList de estágios: cada estágio é uma chamada indireta via ponteiro de função. Para pipelines com muitos estágios no hot path, considere a versão comptime.
  • Alocações intermediárias: se cada estágio aloca um buffer de saída, use um ArenaAllocator como allocator do pipeline. Ao final, libere a arena inteira em vez de liberar estágio por estágio.
  • Writers compostos têm flush explícito: ao compor bufferedWriter com arquivo.writer(), lembre de chamar buf_writer.flush() ao final. Dados no buffer não chegam ao destino até o flush.

Erros Comuns

Não liberar buffers intermediários: se cada estágio aloca memória para o resultado, você precisa liberar a entrada de cada estágio (que foi alocada pelo estágio anterior). Use um ArenaAllocator para simplificar — libere tudo de uma vez ao final do pipeline.

Estágios com efeitos colaterais ocultos: um estágio que lê de um banco de dados ou escreve em um arquivo dificulta testes e replay. Prefira estágios puros (entrada → saída) e mantenha I/O nos limites do pipeline.

Pipeline vazio retorna a entrada sem transformação: certifique-se de que o pipeline com zero estágios retorna um resultado válido. Se a entrada for um ponteiro temporário, o caller pode não esperar receber de volta um ponteiro para o buffer original.

Perguntas Frequentes

Qual é a diferença entre Pipeline e Chain of Responsibility? O Pipeline processa cada dado através de todos os estágios em sequência — todos os estágios sempre executam. O Chain of Responsibility passa o dado de handler em handler até que um deles o processe e pare a cadeia.

Como implementar estágios paralelos no pipeline? Para estágios independentes, você pode processar em paralelo com std.Thread. O pipeline aguarda todos os estágios paralelos com thread.join() antes de passar o resultado combinado para o próximo estágio sequencial.

Posso reutilizar o mesmo pipeline para múltiplas entradas? Sim — desde que os estágios sejam funções puras (sem estado interno), você pode chamar pipeline.executar com entradas diferentes sem problema. Se um estágio tem estado (ex: contador de linhas), o estado vai acumular entre chamadas.

Quando Evitar

  • Processamento que não é naturalmente sequencial
  • Quando os estágios têm dependências cruzadas
  • Overhead de criar pipeline para uma única transformação

Veja Também

Continue aprendendo Zig

Explore mais tutoriais e artigos em português para dominar a linguagem Zig.