Thread Pool em Zig — Tutorial Passo a Passo

Thread Pool em Zig — Tutorial Passo a Passo

Neste tutorial, vamos construir uma thread pool em Zig — um pool de threads reutilizáveis que processam tarefas de uma fila compartilhada. Thread pools são a base de servidores web, processamento paralelo e qualquer sistema que precise de concorrência eficiente.

O Que Vamos Construir

Nossa thread pool vai:

  • Manter N threads worker pré-criadas e reutilizáveis
  • Usar uma fila thread-safe para distribuir tarefas
  • Sincronizar com mutex e condition variables
  • Suportar shutdown gracioso (esperar tarefas em andamento)
  • Fornecer métricas (tarefas completadas, tempo médio)

Por Que Este Projeto?

Thread pools eliminam o overhead de criar/destruir threads para cada tarefa. Elas são usadas em todos os servidores de produção. Em Zig, implementamos com os primitivos de threading da stdlib e temos controle total sobre a sincronização.

Pré-requisitos

Passo 1: Estrutura do Projeto

mkdir thread-pool-impl
cd thread-pool-impl
zig init

Passo 2: Fila Thread-Safe

const std = @import("std");
const Thread = std.Thread;
const Mutex = Thread.Mutex;
const Condition = Thread.Condition;
const io = std.io;
const mem = std.mem;
const time = std.time;

/// Uma tarefa a ser executada pela thread pool.
const Tarefa = struct {
    funcao: *const fn (*anyopaque) void,
    contexto: *anyopaque,
};

/// Fila circular thread-safe para tarefas.
/// Usa mutex + condition variable para sincronização.
fn FilaCircular(comptime T: type, comptime capacidade: usize) type {
    return struct {
        items: [capacidade]T,
        inicio: usize,
        fim: usize,
        tamanho: usize,
        mutex: Mutex,
        nao_vazia: Condition,
        nao_cheia: Condition,
        fechada: bool,

        const Self = @This();

        pub fn init() Self {
            return .{
                .items = undefined,
                .inicio = 0,
                .fim = 0,
                .tamanho = 0,
                .mutex = .{},
                .nao_vazia = .{},
                .nao_cheia = .{},
                .fechada = false,
            };
        }

        /// Adiciona um item à fila. Bloqueia se cheia.
        pub fn push(self: *Self, item: T) bool {
            self.mutex.lock();
            defer self.mutex.unlock();

            while (self.tamanho == capacidade and !self.fechada) {
                self.nao_cheia.wait(&self.mutex);
            }

            if (self.fechada) return false;

            self.items[self.fim] = item;
            self.fim = (self.fim + 1) % capacidade;
            self.tamanho += 1;

            self.nao_vazia.signal();
            return true;
        }

        /// Remove um item da fila. Bloqueia se vazia.
        /// Retorna null se a fila foi fechada.
        pub fn pop(self: *Self) ?T {
            self.mutex.lock();
            defer self.mutex.unlock();

            while (self.tamanho == 0 and !self.fechada) {
                self.nao_vazia.wait(&self.mutex);
            }

            if (self.tamanho == 0) return null;

            const item = self.items[self.inicio];
            self.inicio = (self.inicio + 1) % capacidade;
            self.tamanho -= 1;

            self.nao_cheia.signal();
            return item;
        }

        /// Fecha a fila e acorda todas as threads esperando.
        pub fn fechar(self: *Self) void {
            self.mutex.lock();
            defer self.mutex.unlock();

            self.fechada = true;
            self.nao_vazia.broadcast();
            self.nao_cheia.broadcast();
        }

        pub fn len(self: *Self) usize {
            self.mutex.lock();
            defer self.mutex.unlock();
            return self.tamanho;
        }
    };
}

Passo 3: A Thread Pool

const MAX_THREADS = 32;
const CAPACIDADE_FILA = 1024;

/// Pool de threads reutilizáveis com fila de tarefas.
const ThreadPool = struct {
    threads: [MAX_THREADS]?Thread,
    num_threads: usize,
    fila: FilaCircular(Tarefa, CAPACIDADE_FILA),
    ativa: bool,

    // Métricas
    tarefas_completadas: std.atomic.Value(u64),
    tarefas_submetidas: std.atomic.Value(u64),

    const Self = @This();

    /// Cria e inicia a thread pool com N threads.
    pub fn init(num_threads: usize) Self {
        var pool = Self{
            .threads = [_]?Thread{null} ** MAX_THREADS,
            .num_threads = @min(num_threads, MAX_THREADS),
            .fila = FilaCircular(Tarefa, CAPACIDADE_FILA).init(),
            .ativa = true,
            .tarefas_completadas = std.atomic.Value(u64).init(0),
            .tarefas_submetidas = std.atomic.Value(u64).init(0),
        };

        // Criar as threads worker
        for (0..pool.num_threads) |i| {
            pool.threads[i] = Thread.spawn(.{}, workerLoop, .{&pool}) catch null;
        }

        return pool;
    }

    /// Loop principal de cada thread worker.
    fn workerLoop(pool: *Self) void {
        while (true) {
            const tarefa = pool.fila.pop() orelse break;

            // Executar a tarefa
            tarefa.funcao(tarefa.contexto);

            _ = pool.tarefas_completadas.fetchAdd(1, .monotonic);
        }
    }

    /// Submete uma tarefa para execução.
    pub fn submeter(self: *Self, funcao: *const fn (*anyopaque) void, contexto: *anyopaque) bool {
        if (!self.ativa) return false;

        const tarefa = Tarefa{
            .funcao = funcao,
            .contexto = contexto,
        };

        if (self.fila.push(tarefa)) {
            _ = self.tarefas_submetidas.fetchAdd(1, .monotonic);
            return true;
        }
        return false;
    }

    /// Encerra a pool: para de aceitar tarefas, espera as pendentes.
    pub fn shutdown(self: *Self) void {
        self.ativa = false;
        self.fila.fechar();

        // Esperar todas as threads terminarem
        for (0..self.num_threads) |i| {
            if (self.threads[i]) |thread| {
                thread.join();
                self.threads[i] = null;
            }
        }
    }

    /// Retorna o número de tarefas completadas.
    pub fn completadas(self: *const Self) u64 {
        return self.tarefas_completadas.load(.monotonic);
    }

    /// Retorna o número de tarefas submetidas.
    pub fn submetidas(self: *const Self) u64 {
        return self.tarefas_submetidas.load(.monotonic);
    }

    /// Retorna o tamanho da fila de tarefas pendentes.
    pub fn pendentes(self: *Self) usize {
        return self.fila.len();
    }
};

Passo 4: Demonstração com Trabalho Real

/// Contexto para uma tarefa de cálculo.
const TarefaCalculo = struct {
    id: u32,
    resultado: u64,
    duracao_ns: i64,
    pronto: bool,

    fn executar(ctx: *anyopaque) void {
        const self: *TarefaCalculo = @ptrCast(@alignCast(ctx));
        const inicio = time.nanoTimestamp();

        // Simulação de trabalho: calcular soma de quadrados
        var soma: u64 = 0;
        for (0..100_000) |i| {
            soma += i * i;
        }

        self.resultado = soma;
        self.duracao_ns = time.nanoTimestamp() - inicio;
        self.pronto = true;
    }
};

pub fn main() !void {
    const stdout = io.getStdOut().writer();

    try stdout.print(
        \\
        \\  ==========================================
        \\     THREAD POOL - Zig
        \\  ==========================================
        \\
        \\
    , .{});

    const num_cpus = Thread.getCpuCount() catch 4;
    const num_threads = @min(num_cpus, 8);

    try stdout.print("  CPUs detectadas: {d}\n", .{num_cpus});
    try stdout.print("  Threads na pool: {d}\n\n", .{num_threads});

    // Criar pool
    var pool = ThreadPool.init(num_threads);

    // Criar tarefas
    const NUM_TAREFAS = 50;
    var tarefas: [NUM_TAREFAS]TarefaCalculo = undefined;
    for (&tarefas, 0..) |*t, i| {
        t.* = .{
            .id = @intCast(i),
            .resultado = 0,
            .duracao_ns = 0,
            .pronto = false,
        };
    }

    // Submeter todas as tarefas
    const inicio = time.nanoTimestamp();
    try stdout.print("  Submetendo {d} tarefas...\n", .{NUM_TAREFAS});

    for (&tarefas) |*t| {
        _ = pool.submeter(&TarefaCalculo.executar, @ptrCast(t));
    }

    try stdout.print("  Tarefas submetidas: {d}\n", .{pool.submetidas()});
    try stdout.print("  Aguardando conclusao...\n\n", .{});

    // Esperar todas completarem
    while (pool.completadas() < NUM_TAREFAS) {
        std.time.sleep(1_000_000); // 1ms
    }

    const duracao_total: u64 = @intCast(time.nanoTimestamp() - inicio);

    // Resultados
    var duracao_soma: u64 = 0;
    for (&tarefas) |*t| {
        if (t.pronto) {
            duracao_soma += @intCast(t.duracao_ns);
        }
    }

    try stdout.print("  --- Resultados ---\n", .{});
    try stdout.print("  Tarefas completadas:   {d}\n", .{pool.completadas()});
    try stdout.print("  Tempo total (paralelo): {d:.2}ms\n", .{
        @as(f64, @floatFromInt(duracao_total)) / 1_000_000.0,
    });
    try stdout.print("  Tempo total (sequencial seria): {d:.2}ms\n", .{
        @as(f64, @floatFromInt(duracao_soma)) / 1_000_000.0,
    });

    if (duracao_total > 0) {
        const speedup = @as(f64, @floatFromInt(duracao_soma)) / @as(f64, @floatFromInt(duracao_total));
        try stdout.print("  Speedup: {d:.1}x\n", .{speedup});
    }

    // Shutdown
    pool.shutdown();
    try stdout.print("\n  Pool encerrada com sucesso.\n", .{});
}

Testes

test "fila circular - push e pop" {
    var fila = FilaCircular(u32, 4).init();
    try std.testing.expect(fila.push(1));
    try std.testing.expect(fila.push(2));
    try std.testing.expectEqual(@as(u32, 1), fila.pop().?);
    try std.testing.expectEqual(@as(u32, 2), fila.pop().?);
}

test "fila circular - fechar retorna null" {
    var fila = FilaCircular(u32, 4).init();
    fila.fechar();
    try std.testing.expect(fila.pop() == null);
}

test "thread pool - criar e encerrar" {
    var pool = ThreadPool.init(2);
    try std.testing.expect(pool.ativa);
    pool.shutdown();
}

test "thread pool - executar tarefas" {
    var pool = ThreadPool.init(2);

    var contador = std.atomic.Value(u32).init(0);

    const Ctx = struct {
        fn incrementar(ctx: *anyopaque) void {
            const c: *std.atomic.Value(u32) = @ptrCast(@alignCast(ctx));
            _ = c.fetchAdd(1, .monotonic);
        }
    };

    for (0..10) |_| {
        _ = pool.submeter(&Ctx.incrementar, @ptrCast(&contador));
    }

    // Esperar
    while (pool.completadas() < 10) {
        std.time.sleep(1_000_000);
    }

    pool.shutdown();
    try std.testing.expectEqual(@as(u32, 10), contador.load(.monotonic));
}

Compilando e Executando

zig build run
zig build test

Conceitos Aprendidos

  • Threads com std.Thread.spawn e join
  • Mutex e Condition Variable para sincronização
  • Fila circular thread-safe (bounded buffer)
  • Atomic operations para contadores compartilhados
  • Ponteiros genéricos (*anyopaque) para callbacks

Próximos Passos

Continue aprendendo Zig

Explore mais tutoriais e artigos em português para dominar a linguagem Zig.