Como Implementar Canais de Comunicação entre Threads em Zig

Introdução

Canais (channels) são um padrão de comunicação entre threads que permitem enviar e receber mensagens de forma segura. Popularizado por Go, esse padrão evita compartilhamento direto de memória, seguindo o princípio “não compartilhe memória para comunicar; comunique para compartilhar memória”.

Nesta receita, implementaremos canais tipados em Zig usando primitivas de sincronização.

Pré-requisitos

Canal Básico (Unbuffered)

Um canal sem buffer que bloqueia o remetente até o destinatário estar pronto:

const std = @import("std");

fn Channel(comptime T: type) type {
    return struct {
        value: ?T = null,
        mutex: std.Thread.Mutex = .{},
        has_value: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
        closed: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),

        const Self = @This();

        pub fn send(self: *Self, val: T) !void {
            if (self.closed.load(.seq_cst)) return error.ChannelClosed;

            // Esperar até o valor anterior ser consumido
            while (self.has_value.load(.seq_cst)) {
                if (self.closed.load(.seq_cst)) return error.ChannelClosed;
                std.time.sleep(std.time.ns_per_us * 10);
            }

            self.mutex.lock();
            defer self.mutex.unlock();

            self.value = val;
            self.has_value.store(true, .seq_cst);
        }

        pub fn receive(self: *Self) !?T {
            // Esperar até ter um valor
            while (!self.has_value.load(.seq_cst)) {
                if (self.closed.load(.seq_cst)) return null;
                std.time.sleep(std.time.ns_per_us * 10);
            }

            self.mutex.lock();
            defer self.mutex.unlock();

            const val = self.value;
            self.value = null;
            self.has_value.store(false, .seq_cst);
            return val;
        }

        pub fn close(self: *Self) void {
            self.closed.store(true, .seq_cst);
        }
    };
}

fn produtor(ch: *Channel(u32)) void {
    for (0..5) |i| {
        ch.send(@intCast(i)) catch return;
        std.debug.print("Enviado: {d}\n", .{i});
    }
    ch.close();
    std.debug.print("Canal fechado pelo produtor.\n", .{});
}

fn consumidor(ch: *Channel(u32)) void {
    while (true) {
        const val = (ch.receive() catch break) orelse break;
        std.debug.print("  Recebido: {d}\n", .{val});
    }
    std.debug.print("Consumidor encerrado.\n", .{});
}

pub fn main() !void {
    var ch = Channel(u32){};

    const t1 = try std.Thread.spawn(.{}, produtor, .{&ch});
    const t2 = try std.Thread.spawn(.{}, consumidor, .{&ch});

    t1.join();
    t2.join();
}

Saída esperada

Enviado: 0
  Recebido: 0
Enviado: 1
  Recebido: 1
Enviado: 2
  Recebido: 2
Enviado: 3
  Recebido: 3
Enviado: 4
  Recebido: 4
Canal fechado pelo produtor.
Consumidor encerrado.

Canal Bufferizado

Um canal com buffer que permite enviar sem bloquear enquanto houver espaço:

const std = @import("std");

fn BufferedChannel(comptime T: type, comptime capacity: usize) type {
    return struct {
        buffer: [capacity]T = undefined,
        head: usize = 0,
        tail: usize = 0,
        count: usize = 0,
        mutex: std.Thread.Mutex = .{},
        closed: bool = false,

        const Self = @This();

        pub fn send(self: *Self, val: T) !void {
            while (true) {
                self.mutex.lock();

                if (self.closed) {
                    self.mutex.unlock();
                    return error.ChannelClosed;
                }

                if (self.count < capacity) {
                    self.buffer[self.tail] = val;
                    self.tail = (self.tail + 1) % capacity;
                    self.count += 1;
                    self.mutex.unlock();
                    return;
                }

                self.mutex.unlock();
                std.time.sleep(std.time.ns_per_us * 10);
            }
        }

        pub fn receive(self: *Self) ?T {
            while (true) {
                self.mutex.lock();

                if (self.count > 0) {
                    const val = self.buffer[self.head];
                    self.head = (self.head + 1) % capacity;
                    self.count -= 1;
                    self.mutex.unlock();
                    return val;
                }

                if (self.closed) {
                    self.mutex.unlock();
                    return null;
                }

                self.mutex.unlock();
                std.time.sleep(std.time.ns_per_us * 10);
            }
        }

        pub fn close(self: *Self) void {
            self.mutex.lock();
            defer self.mutex.unlock();
            self.closed = true;
        }

        pub fn len(self: *Self) usize {
            self.mutex.lock();
            defer self.mutex.unlock();
            return self.count;
        }
    };
}

pub fn main() !void {
    var ch = BufferedChannel([]const u8, 8){};

    // Produtor: enviar várias mensagens rapidamente
    const prod = try std.Thread.spawn(.{}, struct {
        fn work(c: *BufferedChannel([]const u8, 8)) void {
            const msgs = [_][]const u8{
                "Mensagem 1",
                "Mensagem 2",
                "Mensagem 3",
                "Mensagem 4",
                "Mensagem 5",
            };
            for (msgs) |msg| {
                c.send(msg) catch return;
                std.debug.print("Enviado: {s}\n", .{msg});
            }
            c.close();
        }
    }.work, .{&ch});

    // Consumidor: processar com atraso
    const cons = try std.Thread.spawn(.{}, struct {
        fn work(c: *BufferedChannel([]const u8, 8)) void {
            while (c.receive()) |msg| {
                std.debug.print("  Processando: {s}\n", .{msg});
                std.time.sleep(std.time.ns_per_ms * 50);
            }
        }
    }.work, .{&ch});

    prod.join();
    cons.join();

    std.debug.print("Pipeline concluído!\n", .{});
}

Pipeline de Processamento

Conecte múltiplos estágios de processamento com canais:

const std = @import("std");

fn BuffChan(comptime T: type, comptime cap: usize) type {
    return struct {
        buffer: [cap]T = undefined,
        head: usize = 0,
        tail: usize = 0,
        count: usize = 0,
        mutex: std.Thread.Mutex = .{},
        closed: bool = false,

        const Self = @This();

        pub fn send(self: *Self, val: T) void {
            while (true) {
                self.mutex.lock();
                if (self.count < cap) {
                    self.buffer[self.tail] = val;
                    self.tail = (self.tail + 1) % cap;
                    self.count += 1;
                    self.mutex.unlock();
                    return;
                }
                self.mutex.unlock();
                std.time.sleep(std.time.ns_per_us * 10);
            }
        }

        pub fn receive(self: *Self) ?T {
            while (true) {
                self.mutex.lock();
                if (self.count > 0) {
                    const val = self.buffer[self.head];
                    self.head = (self.head + 1) % cap;
                    self.count -= 1;
                    self.mutex.unlock();
                    return val;
                }
                if (self.closed) {
                    self.mutex.unlock();
                    return null;
                }
                self.mutex.unlock();
                std.time.sleep(std.time.ns_per_us * 10);
            }
        }

        pub fn close(self: *Self) void {
            self.mutex.lock();
            self.closed = true;
            self.mutex.unlock();
        }
    };
}

pub fn main() !void {
    // Pipeline: gerador -> dobrar -> filtrar -> imprimir
    var ch1 = BuffChan(u32, 16){};
    var ch2 = BuffChan(u32, 16){};
    var ch3 = BuffChan(u32, 16){};

    // Estágio 1: Gerar números
    const t1 = try std.Thread.spawn(.{}, struct {
        fn work(out: *BuffChan(u32, 16)) void {
            for (1..11) |i| {
                out.send(@intCast(i));
            }
            out.close();
        }
    }.work, .{&ch1});

    // Estágio 2: Dobrar valores
    const t2 = try std.Thread.spawn(.{}, struct {
        fn work(in: *BuffChan(u32, 16), out: *BuffChan(u32, 16)) void {
            while (in.receive()) |val| {
                out.send(val * 2);
            }
            out.close();
        }
    }.work, .{ &ch1, &ch2 });

    // Estágio 3: Filtrar pares maiores que 10
    const t3 = try std.Thread.spawn(.{}, struct {
        fn work(in: *BuffChan(u32, 16), out: *BuffChan(u32, 16)) void {
            while (in.receive()) |val| {
                if (val > 10) {
                    out.send(val);
                }
            }
            out.close();
        }
    }.work, .{ &ch2, &ch3 });

    // Estágio 4: Imprimir (thread principal)
    t1.detach();
    t2.detach();
    t3.detach();

    std.debug.print("Pipeline: gerar -> dobrar -> filtrar(>10) -> imprimir\n", .{});
    while (ch3.receive()) |val| {
        std.debug.print("Resultado: {d}\n", .{val});
    }

    std.debug.print("Pipeline concluído!\n", .{});
}

Saída esperada

Pipeline: gerar -> dobrar -> filtrar(>10) -> imprimir
Resultado: 12
Resultado: 14
Resultado: 16
Resultado: 18
Resultado: 20
Pipeline concluído!

Dicas e Boas Práticas

  1. Feche canais pelo produtor: Quem envia é responsável por fechar o canal.

  2. Buffer adequado: Use canais bufferizados quando produtor e consumidor têm velocidades diferentes.

  3. Evite deadlocks: Certifique-se que o consumidor está ativo antes de enviar, ou use canais bufferizados.

  4. Pipelines: Canais são excelentes para construir pipelines de processamento em estágios.

  5. Para alta performance: Considere filas lock-free com operações atômicas.

Receitas Relacionadas

Tutoriais Relacionados

Continue aprendendo Zig

Explore mais tutoriais e artigos em português para dominar a linguagem Zig.