Thread Pool em Zig — Tutorial Passo a Passo
Neste tutorial, vamos construir uma thread pool em Zig — um pool de threads reutilizáveis que processam tarefas de uma fila compartilhada. Thread pools são a base de servidores web, processamento paralelo e qualquer sistema que precise de concorrência eficiente.
O Que Vamos Construir
Nossa thread pool vai:
- Manter N threads worker pré-criadas e reutilizáveis
- Usar uma fila thread-safe para distribuir tarefas
- Sincronizar com mutex e condition variables
- Suportar shutdown gracioso (esperar tarefas em andamento)
- Fornecer métricas (tarefas completadas, tempo médio)
Por Que Este Projeto?
Thread pools eliminam o overhead de criar/destruir threads para cada tarefa. Elas são usadas em todos os servidores de produção. Em Zig, implementamos com os primitivos de threading da stdlib e temos controle total sobre a sincronização.
Pré-requisitos
- Zig 0.13+ instalado (guia de instalação)
- Conhecimento de threads e concorrência
- Familiaridade com mutex e sincronização
Passo 1: Estrutura do Projeto
mkdir thread-pool-impl
cd thread-pool-impl
zig init
Passo 2: Fila Thread-Safe
const std = @import("std");
const Thread = std.Thread;
const Mutex = Thread.Mutex;
const Condition = Thread.Condition;
const io = std.io;
const mem = std.mem;
const time = std.time;
/// Uma tarefa a ser executada pela thread pool.
const Tarefa = struct {
funcao: *const fn (*anyopaque) void,
contexto: *anyopaque,
};
/// Fila circular thread-safe para tarefas.
/// Usa mutex + condition variable para sincronização.
fn FilaCircular(comptime T: type, comptime capacidade: usize) type {
return struct {
items: [capacidade]T,
inicio: usize,
fim: usize,
tamanho: usize,
mutex: Mutex,
nao_vazia: Condition,
nao_cheia: Condition,
fechada: bool,
const Self = @This();
pub fn init() Self {
return .{
.items = undefined,
.inicio = 0,
.fim = 0,
.tamanho = 0,
.mutex = .{},
.nao_vazia = .{},
.nao_cheia = .{},
.fechada = false,
};
}
/// Adiciona um item à fila. Bloqueia se cheia.
pub fn push(self: *Self, item: T) bool {
self.mutex.lock();
defer self.mutex.unlock();
while (self.tamanho == capacidade and !self.fechada) {
self.nao_cheia.wait(&self.mutex);
}
if (self.fechada) return false;
self.items[self.fim] = item;
self.fim = (self.fim + 1) % capacidade;
self.tamanho += 1;
self.nao_vazia.signal();
return true;
}
/// Remove um item da fila. Bloqueia se vazia.
/// Retorna null se a fila foi fechada.
pub fn pop(self: *Self) ?T {
self.mutex.lock();
defer self.mutex.unlock();
while (self.tamanho == 0 and !self.fechada) {
self.nao_vazia.wait(&self.mutex);
}
if (self.tamanho == 0) return null;
const item = self.items[self.inicio];
self.inicio = (self.inicio + 1) % capacidade;
self.tamanho -= 1;
self.nao_cheia.signal();
return item;
}
/// Fecha a fila e acorda todas as threads esperando.
pub fn fechar(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
self.fechada = true;
self.nao_vazia.broadcast();
self.nao_cheia.broadcast();
}
pub fn len(self: *Self) usize {
self.mutex.lock();
defer self.mutex.unlock();
return self.tamanho;
}
};
}
Passo 3: A Thread Pool
const MAX_THREADS = 32;
const CAPACIDADE_FILA = 1024;
/// Pool de threads reutilizáveis com fila de tarefas.
const ThreadPool = struct {
threads: [MAX_THREADS]?Thread,
num_threads: usize,
fila: FilaCircular(Tarefa, CAPACIDADE_FILA),
ativa: bool,
// Métricas
tarefas_completadas: std.atomic.Value(u64),
tarefas_submetidas: std.atomic.Value(u64),
const Self = @This();
/// Cria e inicia a thread pool com N threads.
pub fn init(num_threads: usize) Self {
var pool = Self{
.threads = [_]?Thread{null} ** MAX_THREADS,
.num_threads = @min(num_threads, MAX_THREADS),
.fila = FilaCircular(Tarefa, CAPACIDADE_FILA).init(),
.ativa = true,
.tarefas_completadas = std.atomic.Value(u64).init(0),
.tarefas_submetidas = std.atomic.Value(u64).init(0),
};
// Criar as threads worker
for (0..pool.num_threads) |i| {
pool.threads[i] = Thread.spawn(.{}, workerLoop, .{&pool}) catch null;
}
return pool;
}
/// Loop principal de cada thread worker.
fn workerLoop(pool: *Self) void {
while (true) {
const tarefa = pool.fila.pop() orelse break;
// Executar a tarefa
tarefa.funcao(tarefa.contexto);
_ = pool.tarefas_completadas.fetchAdd(1, .monotonic);
}
}
/// Submete uma tarefa para execução.
pub fn submeter(self: *Self, funcao: *const fn (*anyopaque) void, contexto: *anyopaque) bool {
if (!self.ativa) return false;
const tarefa = Tarefa{
.funcao = funcao,
.contexto = contexto,
};
if (self.fila.push(tarefa)) {
_ = self.tarefas_submetidas.fetchAdd(1, .monotonic);
return true;
}
return false;
}
/// Encerra a pool: para de aceitar tarefas, espera as pendentes.
pub fn shutdown(self: *Self) void {
self.ativa = false;
self.fila.fechar();
// Esperar todas as threads terminarem
for (0..self.num_threads) |i| {
if (self.threads[i]) |thread| {
thread.join();
self.threads[i] = null;
}
}
}
/// Retorna o número de tarefas completadas.
pub fn completadas(self: *const Self) u64 {
return self.tarefas_completadas.load(.monotonic);
}
/// Retorna o número de tarefas submetidas.
pub fn submetidas(self: *const Self) u64 {
return self.tarefas_submetidas.load(.monotonic);
}
/// Retorna o tamanho da fila de tarefas pendentes.
pub fn pendentes(self: *Self) usize {
return self.fila.len();
}
};
Passo 4: Demonstração com Trabalho Real
/// Contexto para uma tarefa de cálculo.
const TarefaCalculo = struct {
id: u32,
resultado: u64,
duracao_ns: i64,
pronto: bool,
fn executar(ctx: *anyopaque) void {
const self: *TarefaCalculo = @ptrCast(@alignCast(ctx));
const inicio = time.nanoTimestamp();
// Simulação de trabalho: calcular soma de quadrados
var soma: u64 = 0;
for (0..100_000) |i| {
soma += i * i;
}
self.resultado = soma;
self.duracao_ns = time.nanoTimestamp() - inicio;
self.pronto = true;
}
};
pub fn main() !void {
const stdout = io.getStdOut().writer();
try stdout.print(
\\
\\ ==========================================
\\ THREAD POOL - Zig
\\ ==========================================
\\
\\
, .{});
const num_cpus = Thread.getCpuCount() catch 4;
const num_threads = @min(num_cpus, 8);
try stdout.print(" CPUs detectadas: {d}\n", .{num_cpus});
try stdout.print(" Threads na pool: {d}\n\n", .{num_threads});
// Criar pool
var pool = ThreadPool.init(num_threads);
// Criar tarefas
const NUM_TAREFAS = 50;
var tarefas: [NUM_TAREFAS]TarefaCalculo = undefined;
for (&tarefas, 0..) |*t, i| {
t.* = .{
.id = @intCast(i),
.resultado = 0,
.duracao_ns = 0,
.pronto = false,
};
}
// Submeter todas as tarefas
const inicio = time.nanoTimestamp();
try stdout.print(" Submetendo {d} tarefas...\n", .{NUM_TAREFAS});
for (&tarefas) |*t| {
_ = pool.submeter(&TarefaCalculo.executar, @ptrCast(t));
}
try stdout.print(" Tarefas submetidas: {d}\n", .{pool.submetidas()});
try stdout.print(" Aguardando conclusao...\n\n", .{});
// Esperar todas completarem
while (pool.completadas() < NUM_TAREFAS) {
std.time.sleep(1_000_000); // 1ms
}
const duracao_total: u64 = @intCast(time.nanoTimestamp() - inicio);
// Resultados
var duracao_soma: u64 = 0;
for (&tarefas) |*t| {
if (t.pronto) {
duracao_soma += @intCast(t.duracao_ns);
}
}
try stdout.print(" --- Resultados ---\n", .{});
try stdout.print(" Tarefas completadas: {d}\n", .{pool.completadas()});
try stdout.print(" Tempo total (paralelo): {d:.2}ms\n", .{
@as(f64, @floatFromInt(duracao_total)) / 1_000_000.0,
});
try stdout.print(" Tempo total (sequencial seria): {d:.2}ms\n", .{
@as(f64, @floatFromInt(duracao_soma)) / 1_000_000.0,
});
if (duracao_total > 0) {
const speedup = @as(f64, @floatFromInt(duracao_soma)) / @as(f64, @floatFromInt(duracao_total));
try stdout.print(" Speedup: {d:.1}x\n", .{speedup});
}
// Shutdown
pool.shutdown();
try stdout.print("\n Pool encerrada com sucesso.\n", .{});
}
Testes
test "fila circular - push e pop" {
var fila = FilaCircular(u32, 4).init();
try std.testing.expect(fila.push(1));
try std.testing.expect(fila.push(2));
try std.testing.expectEqual(@as(u32, 1), fila.pop().?);
try std.testing.expectEqual(@as(u32, 2), fila.pop().?);
}
test "fila circular - fechar retorna null" {
var fila = FilaCircular(u32, 4).init();
fila.fechar();
try std.testing.expect(fila.pop() == null);
}
test "thread pool - criar e encerrar" {
var pool = ThreadPool.init(2);
try std.testing.expect(pool.ativa);
pool.shutdown();
}
test "thread pool - executar tarefas" {
var pool = ThreadPool.init(2);
var contador = std.atomic.Value(u32).init(0);
const Ctx = struct {
fn incrementar(ctx: *anyopaque) void {
const c: *std.atomic.Value(u32) = @ptrCast(@alignCast(ctx));
_ = c.fetchAdd(1, .monotonic);
}
};
for (0..10) |_| {
_ = pool.submeter(&Ctx.incrementar, @ptrCast(&contador));
}
// Esperar
while (pool.completadas() < 10) {
std.time.sleep(1_000_000);
}
pool.shutdown();
try std.testing.expectEqual(@as(u32, 10), contador.load(.monotonic));
}
Compilando e Executando
zig build run
zig build test
Conceitos Aprendidos
- Threads com
std.Thread.spawnejoin - Mutex e Condition Variable para sincronização
- Fila circular thread-safe (bounded buffer)
- Atomic operations para contadores compartilhados
- Ponteiros genéricos (
*anyopaque) para callbacks
Próximos Passos
- Explore threads na stdlib
- Veja o Load Balancer que usa pool de conexões
- Construa o Ring Buffer lock-free
- Consulte concorrência para mais padrões