Como Usar Thread Pool em Zig

Introdução

Um thread pool é um padrão que mantém um conjunto fixo de threads reutilizáveis para processar tarefas. Em vez de criar e destruir threads para cada tarefa (custoso), o pool reutiliza threads existentes, melhorando significativamente a performance.

Nesta receita, você aprenderá a implementar e usar thread pools em Zig.

Pré-requisitos

Thread Pool Simples

Uma implementação básica de thread pool:

const std = @import("std");

fn ThreadPool(comptime max_threads: usize) type {
    return struct {
        threads: [max_threads]std.Thread = undefined,
        active: usize = 0,

        const Self = @This();

        pub fn submit(self: *Self, comptime func: anytype, args: anytype) !void {
            if (self.active >= max_threads) {
                // Esperar threads existentes
                self.waitAll();
            }

            self.threads[self.active] = try std.Thread.spawn(.{}, func, args);
            self.active += 1;
        }

        pub fn waitAll(self: *Self) void {
            for (self.threads[0..self.active]) |t| {
                t.join();
            }
            self.active = 0;
        }
    };
}

fn processarItem(id: usize) void {
    std.debug.print("Processando item {d}...\n", .{id});
    std.time.sleep(std.time.ns_per_ms * 50);
    std.debug.print("Item {d} concluído.\n", .{id});
}

pub fn main() !void {
    var pool = ThreadPool(4){};

    // Submeter 10 tarefas para o pool de 4 threads
    for (0..10) |i| {
        try pool.submit(processarItem, .{i});
    }

    pool.waitAll();
    std.debug.print("Todas as tarefas concluídas!\n", .{});
}

Worker Pool com Fila de Tarefas

Uma implementação mais robusta com fila de tarefas:

const std = @import("std");

const Task = struct {
    id: u32,
    dados: []const u8,
};

const WorkerPool = struct {
    const QUEUE_SIZE = 256;

    tarefas: [QUEUE_SIZE]Task = undefined,
    head: usize = 0,
    tail: usize = 0,
    count: usize = 0,
    mutex: std.Thread.Mutex = .{},
    concluido: bool = false,
    threads: []std.Thread = undefined,
    allocator: std.mem.Allocator,

    pub fn init(allocator: std.mem.Allocator, num_workers: usize) !WorkerPool {
        var pool = WorkerPool{
            .allocator = allocator,
            .threads = try allocator.alloc(std.Thread, num_workers),
        };

        for (pool.threads) |*t| {
            t.* = try std.Thread.spawn(.{}, workerLoop, .{&pool});
        }

        return pool;
    }

    pub fn submit(self: *WorkerPool, task: Task) !void {
        self.mutex.lock();
        defer self.mutex.unlock();

        if (self.count >= QUEUE_SIZE) return error.QueueFull;

        self.tarefas[self.tail] = task;
        self.tail = (self.tail + 1) % QUEUE_SIZE;
        self.count += 1;
    }

    fn popTask(self: *WorkerPool) ?Task {
        self.mutex.lock();
        defer self.mutex.unlock();

        if (self.count == 0) return null;

        const task = self.tarefas[self.head];
        self.head = (self.head + 1) % QUEUE_SIZE;
        self.count -= 1;
        return task;
    }

    fn workerLoop(pool: *WorkerPool) void {
        while (true) {
            if (pool.popTask()) |task| {
                // Processar tarefa
                std.debug.print("Worker processando tarefa #{d}: {s}\n", .{
                    task.id, task.dados,
                });
                std.time.sleep(std.time.ns_per_ms * 20);
            } else if (pool.concluido) {
                break;
            } else {
                std.time.sleep(std.time.ns_per_ms);
            }
        }
    }

    pub fn shutdown(self: *WorkerPool) void {
        // Esperar fila esvaziar
        while (true) {
            self.mutex.lock();
            const empty = self.count == 0;
            self.mutex.unlock();
            if (empty) break;
            std.time.sleep(std.time.ns_per_ms);
        }

        self.concluido = true;

        for (self.threads) |t| {
            t.join();
        }
        self.allocator.free(self.threads);
    }
};

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();
    const allocator = gpa.allocator();

    var pool = try WorkerPool.init(allocator, 4);

    // Submeter tarefas
    const items = [_][]const u8{
        "Compilar código",
        "Rodar testes",
        "Gerar documentação",
        "Analisar logs",
        "Enviar notificações",
        "Backup banco de dados",
        "Processar imagens",
        "Indexar busca",
    };

    for (items, 0..) |item, i| {
        try pool.submit(.{
            .id = @intCast(i),
            .dados = item,
        });
    }

    pool.shutdown();
    std.debug.print("Pool encerrado com sucesso!\n", .{});
}

Map Paralelo

Execute uma operação em todos os elementos de um array usando múltiplas threads:

const std = @import("std");

fn parallelMap(
    comptime T: type,
    comptime R: type,
    items: []const T,
    comptime func: fn (T) R,
    results: []R,
    num_threads: usize,
) !void {
    const chunk_size = (items.len + num_threads - 1) / num_threads;

    const threads = try std.heap.page_allocator.alloc(std.Thread, num_threads);
    defer std.heap.page_allocator.free(threads);

    var active: usize = 0;
    var offset: usize = 0;

    while (offset < items.len) {
        const end = @min(offset + chunk_size, items.len);
        const chunk_items = items[offset..end];
        const chunk_results = results[offset..end];

        threads[active] = try std.Thread.spawn(.{}, struct {
            fn work(in: []const T, out: []R) void {
                for (in, 0..) |item, i| {
                    out[i] = func(item);
                }
            }
        }.work, .{ chunk_items, chunk_results });

        active += 1;
        offset = end;
    }

    for (threads[0..active]) |t| {
        t.join();
    }
}

fn quadrado(x: u32) u64 {
    // Simular trabalho
    std.time.sleep(std.time.ns_per_ms * 10);
    return @as(u64, x) * @as(u64, x);
}

pub fn main() !void {
    const dados = [_]u32{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 };
    var resultados: [dados.len]u64 = undefined;

    const timer = try std.time.Timer.start();

    try parallelMap(u32, u64, &dados, quadrado, &resultados, 4);

    const elapsed = timer.read();

    std.debug.print("Resultados: ", .{});
    for (&resultados) |r| {
        std.debug.print("{d} ", .{r});
    }
    std.debug.print("\nTempo: {d}ms\n", .{elapsed / std.time.ns_per_ms});
}

Saída esperada

Resultados: 1 4 9 16 25 36 49 64 81 100 121 144
Tempo: ~30ms (em vez de ~120ms sequencial)

Dicas e Boas Práticas

  1. Tamanho do pool: Use std.Thread.getCpuCount() para determinar o número ideal de threads baseado nos cores disponíveis.

  2. Granularidade das tarefas: Tarefas muito pequenas adicionam overhead. Agrupe operações pequenas em batches.

  3. Evite contenção: Se todas as threads competem pelo mesmo mutex, o ganho de paralelismo é reduzido.

  4. Shutdown gracioso: Sempre espere a fila esvaziar antes de encerrar o pool.

  5. Considere a memória: Cada thread consome stack memory. Pools muito grandes podem esgotar memória.

Receitas Relacionadas

Tutoriais Relacionados

Continue aprendendo Zig

Explore mais tutoriais e artigos em português para dominar a linguagem Zig.