Introdução
Um thread pool é um padrão que mantém um conjunto fixo de threads reutilizáveis para processar tarefas. Em vez de criar e destruir threads para cada tarefa (custoso), o pool reutiliza threads existentes, melhorando significativamente a performance.
Nesta receita, você aprenderá a implementar e usar thread pools em Zig.
Pré-requisitos
- Zig instalado (versão 0.13+). Veja o guia de instalação
- Conhecimento de threads e mutex em Zig
Thread Pool Simples
Uma implementação básica de thread pool:
const std = @import("std");
fn ThreadPool(comptime max_threads: usize) type {
return struct {
threads: [max_threads]std.Thread = undefined,
active: usize = 0,
const Self = @This();
pub fn submit(self: *Self, comptime func: anytype, args: anytype) !void {
if (self.active >= max_threads) {
// Esperar threads existentes
self.waitAll();
}
self.threads[self.active] = try std.Thread.spawn(.{}, func, args);
self.active += 1;
}
pub fn waitAll(self: *Self) void {
for (self.threads[0..self.active]) |t| {
t.join();
}
self.active = 0;
}
};
}
fn processarItem(id: usize) void {
std.debug.print("Processando item {d}...\n", .{id});
std.time.sleep(std.time.ns_per_ms * 50);
std.debug.print("Item {d} concluído.\n", .{id});
}
pub fn main() !void {
var pool = ThreadPool(4){};
// Submeter 10 tarefas para o pool de 4 threads
for (0..10) |i| {
try pool.submit(processarItem, .{i});
}
pool.waitAll();
std.debug.print("Todas as tarefas concluídas!\n", .{});
}
Worker Pool com Fila de Tarefas
Uma implementação mais robusta com fila de tarefas:
const std = @import("std");
const Task = struct {
id: u32,
dados: []const u8,
};
const WorkerPool = struct {
const QUEUE_SIZE = 256;
tarefas: [QUEUE_SIZE]Task = undefined,
head: usize = 0,
tail: usize = 0,
count: usize = 0,
mutex: std.Thread.Mutex = .{},
concluido: bool = false,
threads: []std.Thread = undefined,
allocator: std.mem.Allocator,
pub fn init(allocator: std.mem.Allocator, num_workers: usize) !WorkerPool {
var pool = WorkerPool{
.allocator = allocator,
.threads = try allocator.alloc(std.Thread, num_workers),
};
for (pool.threads) |*t| {
t.* = try std.Thread.spawn(.{}, workerLoop, .{&pool});
}
return pool;
}
pub fn submit(self: *WorkerPool, task: Task) !void {
self.mutex.lock();
defer self.mutex.unlock();
if (self.count >= QUEUE_SIZE) return error.QueueFull;
self.tarefas[self.tail] = task;
self.tail = (self.tail + 1) % QUEUE_SIZE;
self.count += 1;
}
fn popTask(self: *WorkerPool) ?Task {
self.mutex.lock();
defer self.mutex.unlock();
if (self.count == 0) return null;
const task = self.tarefas[self.head];
self.head = (self.head + 1) % QUEUE_SIZE;
self.count -= 1;
return task;
}
fn workerLoop(pool: *WorkerPool) void {
while (true) {
if (pool.popTask()) |task| {
// Processar tarefa
std.debug.print("Worker processando tarefa #{d}: {s}\n", .{
task.id, task.dados,
});
std.time.sleep(std.time.ns_per_ms * 20);
} else if (pool.concluido) {
break;
} else {
std.time.sleep(std.time.ns_per_ms);
}
}
}
pub fn shutdown(self: *WorkerPool) void {
// Esperar fila esvaziar
while (true) {
self.mutex.lock();
const empty = self.count == 0;
self.mutex.unlock();
if (empty) break;
std.time.sleep(std.time.ns_per_ms);
}
self.concluido = true;
for (self.threads) |t| {
t.join();
}
self.allocator.free(self.threads);
}
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var pool = try WorkerPool.init(allocator, 4);
// Submeter tarefas
const items = [_][]const u8{
"Compilar código",
"Rodar testes",
"Gerar documentação",
"Analisar logs",
"Enviar notificações",
"Backup banco de dados",
"Processar imagens",
"Indexar busca",
};
for (items, 0..) |item, i| {
try pool.submit(.{
.id = @intCast(i),
.dados = item,
});
}
pool.shutdown();
std.debug.print("Pool encerrado com sucesso!\n", .{});
}
Map Paralelo
Execute uma operação em todos os elementos de um array usando múltiplas threads:
const std = @import("std");
fn parallelMap(
comptime T: type,
comptime R: type,
items: []const T,
comptime func: fn (T) R,
results: []R,
num_threads: usize,
) !void {
const chunk_size = (items.len + num_threads - 1) / num_threads;
const threads = try std.heap.page_allocator.alloc(std.Thread, num_threads);
defer std.heap.page_allocator.free(threads);
var active: usize = 0;
var offset: usize = 0;
while (offset < items.len) {
const end = @min(offset + chunk_size, items.len);
const chunk_items = items[offset..end];
const chunk_results = results[offset..end];
threads[active] = try std.Thread.spawn(.{}, struct {
fn work(in: []const T, out: []R) void {
for (in, 0..) |item, i| {
out[i] = func(item);
}
}
}.work, .{ chunk_items, chunk_results });
active += 1;
offset = end;
}
for (threads[0..active]) |t| {
t.join();
}
}
fn quadrado(x: u32) u64 {
// Simular trabalho
std.time.sleep(std.time.ns_per_ms * 10);
return @as(u64, x) * @as(u64, x);
}
pub fn main() !void {
const dados = [_]u32{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
var resultados: [dados.len]u64 = undefined;
const timer = try std.time.Timer.start();
try parallelMap(u32, u64, &dados, quadrado, &resultados, 4);
const elapsed = timer.read();
std.debug.print("Resultados: ", .{});
for (&resultados) |r| {
std.debug.print("{d} ", .{r});
}
std.debug.print("\nTempo: {d}ms\n", .{elapsed / std.time.ns_per_ms});
}
Saída esperada
Resultados: 1 4 9 16 25 36 49 64 81 100 121 144
Tempo: ~30ms (em vez de ~120ms sequencial)
Dicas e Boas Práticas
Tamanho do pool: Use
std.Thread.getCpuCount()para determinar o número ideal de threads baseado nos cores disponíveis.Granularidade das tarefas: Tarefas muito pequenas adicionam overhead. Agrupe operações pequenas em batches.
Evite contenção: Se todas as threads competem pelo mesmo mutex, o ganho de paralelismo é reduzido.
Shutdown gracioso: Sempre espere a fila esvaziar antes de encerrar o pool.
Considere a memória: Cada thread consome stack memory. Pools muito grandes podem esgotar memória.
Receitas Relacionadas
- Como criar threads em Zig - Fundamentos de threads
- Como usar Mutex em Zig - Sincronização
- Como usar operações atômicas em Zig - Lock-free
- Como usar canais de comunicação em Zig - Coordenação