Pipeline em Zig
O padrão Pipeline processa dados através de uma série de estágios sequenciais, onde a saída de cada estágio alimenta a entrada do próximo. Em Zig, pipelines são elegantemente construídos com composição de funções, writers/readers encadeados e comptime para pipelines estáticos de custo zero.
Quando Usar
- Processamento de texto (ler, parsear, transformar, salvar)
- Pipelines de compilação (lexer, parser, otimizador, code gen)
- ETL (Extract, Transform, Load) de dados
- Processamento de imagem em estágios
- Middleware em servidores HTTP
Pipeline com Array de Funções
const std = @import("std");
const Estagio = *const fn ([]const u8) []const u8;
const TextPipeline = struct {
estagios: std.ArrayList(Estagio),
pub fn init(allocator: std.mem.Allocator) TextPipeline {
return .{ .estagios = std.ArrayList(Estagio).init(allocator) };
}
pub fn deinit(self: *TextPipeline) void {
self.estagios.deinit();
}
pub fn addEstagio(self: *TextPipeline, estagio: Estagio) !void {
try self.estagios.append(estagio);
}
pub fn executar(self: *const TextPipeline, entrada: []const u8) []const u8 {
var dados = entrada;
for (self.estagios.items) |estagio| {
dados = estagio(dados);
}
return dados;
}
};
fn removerEspacos(texto: []const u8) []const u8 {
return std.mem.trim(u8, texto, " \t");
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
var pipeline = TextPipeline.init(gpa.allocator());
defer pipeline.deinit();
try pipeline.addEstagio(removerEspacos);
const resultado = pipeline.executar(" Olá Zig ");
std.debug.print("Resultado: '{s}'\n", .{resultado});
}
Pipeline comptime (Zero Overhead)
const std = @import("std");
fn pipeline(comptime estagios: anytype) fn (i32) i32 {
return struct {
fn executar(valor: i32) i32 {
var resultado = valor;
inline for (estagios) |estagio| {
resultado = estagio(resultado);
}
return resultado;
}
}.executar;
}
fn dobrar(x: i32) i32 { return x * 2; }
fn somar10(x: i32) i32 { return x + 10; }
fn quadrado(x: i32) i32 { return x * x; }
const processar = pipeline(.{ dobrar, somar10, quadrado });
pub fn main() void {
// (5 * 2 + 10)^2 = 400
const resultado = processar(5);
std.debug.print("Resultado: {d}\n", .{resultado}); // 400
}
Pipeline de I/O com Writers
const std = @import("std");
pub fn main() !void {
// Pipeline: stdout <- buffered <- contagem
const arquivo = try std.fs.cwd().createFile("/tmp/saida.txt", .{});
defer arquivo.close();
var buf_writer = std.io.bufferedWriter(arquivo.writer());
const writer = buf_writer.writer();
// Cada write passa por buffer antes de ir ao arquivo
try writer.print("Linha 1: {s}\n", .{"dados"});
try writer.print("Linha 2: {d}\n", .{42});
try buf_writer.flush();
}
Pipeline com Error Propagation
Em pipelines de produção, cada estágio pode falhar. Zig’s error unions tornam isso explícito e seguro:
const std = @import("std");
const EstagioFalivel = *const fn ([]const u8, std.mem.Allocator) anyerror![]const u8;
const Pipeline = struct {
estagios: std.ArrayList(EstagioFalivel),
allocator: std.mem.Allocator,
pub fn init(allocator: std.mem.Allocator) Pipeline {
return .{
.estagios = std.ArrayList(EstagioFalivel).init(allocator),
.allocator = allocator,
};
}
pub fn deinit(self: *Pipeline) void {
self.estagios.deinit();
}
pub fn add(self: *Pipeline, estagio: EstagioFalivel) !void {
try self.estagios.append(estagio);
}
pub fn executar(self: *const Pipeline, entrada: []const u8) ![]const u8 {
var dados = entrada;
for (self.estagios.items) |estagio| {
// Se qualquer estágio falhar, o erro propaga imediatamente
dados = try estagio(dados, self.allocator);
}
return dados;
}
};
fn validarEntrada(texto: []const u8, _: std.mem.Allocator) ![]const u8 {
if (texto.len == 0) return error.EntradaVazia;
return texto;
}
fn normalizarTexto(texto: []const u8, allocator: std.mem.Allocator) ![]const u8 {
return std.ascii.allocLowerString(allocator, texto);
}
Considerações de Performance
- Pipeline comptime é custo zero: o
inline forsobre os estágios comptime resulta em uma sequência de chamadas diretas sem overhead de loop ou ponteiro. O compilador pode inlinar cada estágio e otimizar o conjunto como uma única função. - Pipeline com
ArrayListde estágios: cada estágio é uma chamada indireta via ponteiro de função. Para pipelines com muitos estágios no hot path, considere a versão comptime. - Alocações intermediárias: se cada estágio aloca um buffer de saída, use um
ArenaAllocatorcomo allocator do pipeline. Ao final, libere a arena inteira em vez de liberar estágio por estágio. - Writers compostos têm flush explícito: ao compor
bufferedWritercomarquivo.writer(), lembre de chamarbuf_writer.flush()ao final. Dados no buffer não chegam ao destino até o flush.
Erros Comuns
Não liberar buffers intermediários: se cada estágio aloca memória para o resultado, você precisa liberar a entrada de cada estágio (que foi alocada pelo estágio anterior). Use um ArenaAllocator para simplificar — libere tudo de uma vez ao final do pipeline.
Estágios com efeitos colaterais ocultos: um estágio que lê de um banco de dados ou escreve em um arquivo dificulta testes e replay. Prefira estágios puros (entrada → saída) e mantenha I/O nos limites do pipeline.
Pipeline vazio retorna a entrada sem transformação: certifique-se de que o pipeline com zero estágios retorna um resultado válido. Se a entrada for um ponteiro temporário, o caller pode não esperar receber de volta um ponteiro para o buffer original.
Perguntas Frequentes
Qual é a diferença entre Pipeline e Chain of Responsibility? O Pipeline processa cada dado através de todos os estágios em sequência — todos os estágios sempre executam. O Chain of Responsibility passa o dado de handler em handler até que um deles o processe e pare a cadeia.
Como implementar estágios paralelos no pipeline?
Para estágios independentes, você pode processar em paralelo com std.Thread. O pipeline aguarda todos os estágios paralelos com thread.join() antes de passar o resultado combinado para o próximo estágio sequencial.
Posso reutilizar o mesmo pipeline para múltiplas entradas?
Sim — desde que os estágios sejam funções puras (sem estado interno), você pode chamar pipeline.executar com entradas diferentes sem problema. Se um estágio tem estado (ex: contador de linhas), o estado vai acumular entre chamadas.
Quando Evitar
- Processamento que não é naturalmente sequencial
- Quando os estágios têm dependências cruzadas
- Overhead de criar pipeline para uma única transformação
Veja Também
- Iterator — Processamento lazy de sequências
- Decorator — Composição de comportamentos
- Command — Encapsular operações como objetos
- Comptime — Pipelines de custo zero
- Operações I/O — Writers composíveis