std.Thread.Pool — Pool de Threads
O std.Thread.Pool gerencia um conjunto reutilizável de threads de trabalho (worker threads) que processam tarefas de uma fila compartilhada. Em vez de criar e destruir threads para cada tarefa, o pool mantém threads vivas esperando por trabalho, reduzindo o overhead de criação e melhorando o throughput. É a abordagem recomendada para paralelismo em Zig quando há muitas tarefas curtas a processar.
Visão Geral
const std = @import("std");
const Pool = std.Thread.Pool;
Como Funciona
- O pool cria N threads de trabalho na inicialização
- Tarefas são submetidas via
spawn - Threads ociosas pegam tarefas da fila
waitForIdleoudeinitespera todas as tarefas completarem
Funções Principais
Criação e Destruição
// Inicializa o pool de threads
pub fn init(self: *Pool, options: Options) !void
// Para todas as threads e libera recursos
pub fn deinit(self: *Pool) void
Options
pub const Options = struct {
// Número de threads (null = número de CPUs)
n_jobs: ?u32 = null,
// Alocador para memória interna
allocator: Allocator,
};
Submissão de Tarefas
// Submete uma tarefa para execução
pub fn spawn(self: *Pool, function: anytype, args: anytype) void
// Aguarda todas as tarefas pendentes terminarem
pub fn waitForIdle(self: *Pool) void
Exemplo 1: Processamento Paralelo de Dados
const std = @import("std");
fn processarBloco(bloco: []const u8, resultado: *std.atomic.Value(u64)) void {
var soma: u64 = 0;
for (bloco) |byte| {
soma += byte;
}
_ = resultado.fetchAdd(soma, .seq_cst);
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
// Dados de exemplo
var dados: [100_000]u8 = undefined;
for (&dados, 0..) |*d, i| {
d.* = @intCast(i % 256);
}
var resultado = std.atomic.Value(u64).init(0);
// Cria pool
var pool: std.Thread.Pool = undefined;
try pool.init(.{ .allocator = allocator });
defer pool.deinit();
// Divide em blocos e submete
const BLOCO_TAM = 10_000;
var offset: usize = 0;
while (offset < dados.len) {
const fim = @min(offset + BLOCO_TAM, dados.len);
const bloco = dados[offset..fim];
pool.spawn(processarBloco, .{ bloco, &resultado });
offset = fim;
}
// Espera conclusão
pool.waitForIdle();
const stdout = std.io.getStdOut().writer();
try stdout.print("Soma total: {d}\n", .{resultado.raw});
// Verifica com cálculo sequencial
var soma_seq: u64 = 0;
for (dados) |d| soma_seq += d;
try stdout.print("Verificação: {d}\n", .{soma_seq});
try stdout.print("Correto: {}\n", .{resultado.raw == soma_seq});
}
Exemplo 2: Pipeline de Transformação
const std = @import("std");
const Tarefa = struct {
entrada: []const u8,
saida: *std.ArrayList(u8),
mutex: *std.Thread.Mutex,
};
fn transformar(tarefa: Tarefa) void {
// Transforma para maiúsculas
var buf: [256]u8 = undefined;
const len = @min(tarefa.entrada.len, buf.len);
for (tarefa.entrada[0..len], 0..) |c, i| {
buf[i] = std.ascii.toUpper(c);
}
// Escreve resultado de forma thread-safe
tarefa.mutex.lock();
defer tarefa.mutex.unlock();
tarefa.saida.appendSlice(buf[0..len]) catch return;
tarefa.saida.append('\n') catch return;
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var resultado = std.ArrayList(u8).init(allocator);
defer resultado.deinit();
var mutex = std.Thread.Mutex{};
var pool: std.Thread.Pool = undefined;
try pool.init(.{ .allocator = allocator, .n_jobs = 4 });
defer pool.deinit();
const linhas = [_][]const u8{
"zig é uma linguagem de sistemas",
"compilação em tempo de compilação",
"sem garbage collector",
"interoperabilidade com c",
"segurança de memória",
};
for (linhas) |linha| {
pool.spawn(transformar, .{Tarefa{
.entrada = linha,
.saida = &resultado,
.mutex = &mutex,
}});
}
pool.waitForIdle();
const stdout = std.io.getStdOut().writer();
try stdout.writeAll("Resultado da transformação:\n");
try stdout.writeAll(resultado.items);
}
Exemplo 3: Web Crawler Paralelo Simulado
const std = @import("std");
const UrlResult = struct {
url: []const u8,
status: u16,
tamanho: usize,
};
var resultados: std.ArrayList(UrlResult) = undefined;
var mutex = std.Thread.Mutex{};
fn fetchUrl(url: []const u8) void {
// Simula tempo de rede
std.time.sleep(10 * std.time.ns_per_ms);
// Simula resultado
const resultado = UrlResult{
.url = url,
.status = 200,
.tamanho = url.len * 100,
};
mutex.lock();
defer mutex.unlock();
resultados.append(resultado) catch return;
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
resultados = std.ArrayList(UrlResult).init(allocator);
defer resultados.deinit();
var pool: std.Thread.Pool = undefined;
try pool.init(.{ .allocator = allocator, .n_jobs = 8 });
defer pool.deinit();
const urls = [_][]const u8{
"https://exemplo.com/pagina1",
"https://exemplo.com/pagina2",
"https://exemplo.com/pagina3",
"https://exemplo.com/api/dados",
"https://exemplo.com/api/users",
"https://exemplo.com/sobre",
"https://exemplo.com/contato",
"https://exemplo.com/blog",
};
const inicio = std.time.milliTimestamp();
// Submete todas as URLs para processamento paralelo
for (urls) |url| {
pool.spawn(fetchUrl, .{url});
}
pool.waitForIdle();
const fim = std.time.milliTimestamp();
const stdout = std.io.getStdOut().writer();
try stdout.print("Crawl completo em {d}ms\n", .{fim - inicio});
try stdout.print("URLs processadas: {d}\n\n", .{resultados.items.len});
for (resultados.items) |r| {
try stdout.print(" [{d}] {s} ({d} bytes)\n", .{
r.status, r.url, r.tamanho,
});
}
}
Pool vs Threads Manuais
| Característica | Thread.Pool | Thread.spawn |
|---|---|---|
| Overhead de criação | Baixo (reutiliza) | Alto (cria/destrói) |
| Controle de concorrência | Automático (N workers) | Manual |
| Ideal para | Muitas tarefas curtas | Poucas tarefas longas |
| Complexidade | Simples | Mais controle |
Módulos Relacionados
- std.Thread — Threads individuais
- std.Thread.Mutex — Sincronização
- std.atomic — Operações atômicas