Introdução
Canais (channels) são um padrão de comunicação entre threads que permitem enviar e receber mensagens de forma segura. Popularizado por Go, esse padrão evita compartilhamento direto de memória, seguindo o princípio “não compartilhe memória para comunicar; comunique para compartilhar memória”.
Nesta receita, implementaremos canais tipados em Zig usando primitivas de sincronização.
Pré-requisitos
- Zig instalado (versão 0.13+). Veja o guia de instalação
- Conhecimento de threads e mutex
Canal Básico (Unbuffered)
Um canal sem buffer que bloqueia o remetente até o destinatário estar pronto:
const std = @import("std");
fn Channel(comptime T: type) type {
return struct {
value: ?T = null,
mutex: std.Thread.Mutex = .{},
has_value: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
closed: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
const Self = @This();
pub fn send(self: *Self, val: T) !void {
if (self.closed.load(.seq_cst)) return error.ChannelClosed;
// Esperar até o valor anterior ser consumido
while (self.has_value.load(.seq_cst)) {
if (self.closed.load(.seq_cst)) return error.ChannelClosed;
std.time.sleep(std.time.ns_per_us * 10);
}
self.mutex.lock();
defer self.mutex.unlock();
self.value = val;
self.has_value.store(true, .seq_cst);
}
pub fn receive(self: *Self) !?T {
// Esperar até ter um valor
while (!self.has_value.load(.seq_cst)) {
if (self.closed.load(.seq_cst)) return null;
std.time.sleep(std.time.ns_per_us * 10);
}
self.mutex.lock();
defer self.mutex.unlock();
const val = self.value;
self.value = null;
self.has_value.store(false, .seq_cst);
return val;
}
pub fn close(self: *Self) void {
self.closed.store(true, .seq_cst);
}
};
}
fn produtor(ch: *Channel(u32)) void {
for (0..5) |i| {
ch.send(@intCast(i)) catch return;
std.debug.print("Enviado: {d}\n", .{i});
}
ch.close();
std.debug.print("Canal fechado pelo produtor.\n", .{});
}
fn consumidor(ch: *Channel(u32)) void {
while (true) {
const val = (ch.receive() catch break) orelse break;
std.debug.print(" Recebido: {d}\n", .{val});
}
std.debug.print("Consumidor encerrado.\n", .{});
}
pub fn main() !void {
var ch = Channel(u32){};
const t1 = try std.Thread.spawn(.{}, produtor, .{&ch});
const t2 = try std.Thread.spawn(.{}, consumidor, .{&ch});
t1.join();
t2.join();
}
Saída esperada
Enviado: 0
Recebido: 0
Enviado: 1
Recebido: 1
Enviado: 2
Recebido: 2
Enviado: 3
Recebido: 3
Enviado: 4
Recebido: 4
Canal fechado pelo produtor.
Consumidor encerrado.
Canal Bufferizado
Um canal com buffer que permite enviar sem bloquear enquanto houver espaço:
const std = @import("std");
fn BufferedChannel(comptime T: type, comptime capacity: usize) type {
return struct {
buffer: [capacity]T = undefined,
head: usize = 0,
tail: usize = 0,
count: usize = 0,
mutex: std.Thread.Mutex = .{},
closed: bool = false,
const Self = @This();
pub fn send(self: *Self, val: T) !void {
while (true) {
self.mutex.lock();
if (self.closed) {
self.mutex.unlock();
return error.ChannelClosed;
}
if (self.count < capacity) {
self.buffer[self.tail] = val;
self.tail = (self.tail + 1) % capacity;
self.count += 1;
self.mutex.unlock();
return;
}
self.mutex.unlock();
std.time.sleep(std.time.ns_per_us * 10);
}
}
pub fn receive(self: *Self) ?T {
while (true) {
self.mutex.lock();
if (self.count > 0) {
const val = self.buffer[self.head];
self.head = (self.head + 1) % capacity;
self.count -= 1;
self.mutex.unlock();
return val;
}
if (self.closed) {
self.mutex.unlock();
return null;
}
self.mutex.unlock();
std.time.sleep(std.time.ns_per_us * 10);
}
}
pub fn close(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
self.closed = true;
}
pub fn len(self: *Self) usize {
self.mutex.lock();
defer self.mutex.unlock();
return self.count;
}
};
}
pub fn main() !void {
var ch = BufferedChannel([]const u8, 8){};
// Produtor: enviar várias mensagens rapidamente
const prod = try std.Thread.spawn(.{}, struct {
fn work(c: *BufferedChannel([]const u8, 8)) void {
const msgs = [_][]const u8{
"Mensagem 1",
"Mensagem 2",
"Mensagem 3",
"Mensagem 4",
"Mensagem 5",
};
for (msgs) |msg| {
c.send(msg) catch return;
std.debug.print("Enviado: {s}\n", .{msg});
}
c.close();
}
}.work, .{&ch});
// Consumidor: processar com atraso
const cons = try std.Thread.spawn(.{}, struct {
fn work(c: *BufferedChannel([]const u8, 8)) void {
while (c.receive()) |msg| {
std.debug.print(" Processando: {s}\n", .{msg});
std.time.sleep(std.time.ns_per_ms * 50);
}
}
}.work, .{&ch});
prod.join();
cons.join();
std.debug.print("Pipeline concluído!\n", .{});
}
Pipeline de Processamento
Conecte múltiplos estágios de processamento com canais:
const std = @import("std");
fn BuffChan(comptime T: type, comptime cap: usize) type {
return struct {
buffer: [cap]T = undefined,
head: usize = 0,
tail: usize = 0,
count: usize = 0,
mutex: std.Thread.Mutex = .{},
closed: bool = false,
const Self = @This();
pub fn send(self: *Self, val: T) void {
while (true) {
self.mutex.lock();
if (self.count < cap) {
self.buffer[self.tail] = val;
self.tail = (self.tail + 1) % cap;
self.count += 1;
self.mutex.unlock();
return;
}
self.mutex.unlock();
std.time.sleep(std.time.ns_per_us * 10);
}
}
pub fn receive(self: *Self) ?T {
while (true) {
self.mutex.lock();
if (self.count > 0) {
const val = self.buffer[self.head];
self.head = (self.head + 1) % cap;
self.count -= 1;
self.mutex.unlock();
return val;
}
if (self.closed) {
self.mutex.unlock();
return null;
}
self.mutex.unlock();
std.time.sleep(std.time.ns_per_us * 10);
}
}
pub fn close(self: *Self) void {
self.mutex.lock();
self.closed = true;
self.mutex.unlock();
}
};
}
pub fn main() !void {
// Pipeline: gerador -> dobrar -> filtrar -> imprimir
var ch1 = BuffChan(u32, 16){};
var ch2 = BuffChan(u32, 16){};
var ch3 = BuffChan(u32, 16){};
// Estágio 1: Gerar números
const t1 = try std.Thread.spawn(.{}, struct {
fn work(out: *BuffChan(u32, 16)) void {
for (1..11) |i| {
out.send(@intCast(i));
}
out.close();
}
}.work, .{&ch1});
// Estágio 2: Dobrar valores
const t2 = try std.Thread.spawn(.{}, struct {
fn work(in: *BuffChan(u32, 16), out: *BuffChan(u32, 16)) void {
while (in.receive()) |val| {
out.send(val * 2);
}
out.close();
}
}.work, .{ &ch1, &ch2 });
// Estágio 3: Filtrar pares maiores que 10
const t3 = try std.Thread.spawn(.{}, struct {
fn work(in: *BuffChan(u32, 16), out: *BuffChan(u32, 16)) void {
while (in.receive()) |val| {
if (val > 10) {
out.send(val);
}
}
out.close();
}
}.work, .{ &ch2, &ch3 });
// Estágio 4: Imprimir (thread principal)
t1.detach();
t2.detach();
t3.detach();
std.debug.print("Pipeline: gerar -> dobrar -> filtrar(>10) -> imprimir\n", .{});
while (ch3.receive()) |val| {
std.debug.print("Resultado: {d}\n", .{val});
}
std.debug.print("Pipeline concluído!\n", .{});
}
Saída esperada
Pipeline: gerar -> dobrar -> filtrar(>10) -> imprimir
Resultado: 12
Resultado: 14
Resultado: 16
Resultado: 18
Resultado: 20
Pipeline concluído!
Dicas e Boas Práticas
Feche canais pelo produtor: Quem envia é responsável por fechar o canal.
Buffer adequado: Use canais bufferizados quando produtor e consumidor têm velocidades diferentes.
Evite deadlocks: Certifique-se que o consumidor está ativo antes de enviar, ou use canais bufferizados.
Pipelines: Canais são excelentes para construir pipelines de processamento em estágios.
Para alta performance: Considere filas lock-free com operações atômicas.
Receitas Relacionadas
- Como criar threads em Zig - Base de threads
- Como usar Mutex em Zig - Sincronização
- Como usar operações atômicas em Zig - Lock-free
- Como usar thread pool em Zig - Pool de threads