Каналы - механизм передачи данных между потоками или асинхронными задачами. Идея простая: один конец отправляет сообщения (отправитель), другой их получает (получатель), а вся синхронизация спрятана внутри. За этим стоит целый подход: вместо того чтобы разделять память и вручную её блокировать, данные передают сообщением. В Rust это ложится прямо на модель владения, ведь значение передаётся от отправителя к получателю без алиасинга и гонок данных. Но не везде каналы первоклассный инструмент, например в стандартной библиотеке C++ их нет вовсе, в Python есть очередь-аналог (queue.Queue), а идиомой этот стиль сделал Go.

На практике же стоит копнуть глубже, и оказывается, что каналов в экосистеме Rust целый зоопарк: std::sync::mpsc, crossbeam-channel, tokio::sync, flume, async-channel, kanal и ещё несколько менее известных. Они отличаются моделью, поддержкой асинхронности, скоростью и кучей мелких деталей, о которые легко споткнуться. В этой статье разберёмся, какие каналы бывают, чем они отличаются и какой брать под конкретную задачу.

Классификация каналов

Прежде чем нырять в конкретные реализации, нужно обозначить ключевые характеристики, по которым каналы различаются.

По количеству отправителей и получателей:

  • SPSC (single producer, single consumer) - ровно один отправитель и один получатель

  • MPSC (multiple producers, single consumer) - много отправителей, один получатель

  • MPMC (multiple producers, multiple consumers) - много отправителей и получателей

  • Broadcast - много получателей, каждый получает копию каждого сообщения

  • Watch - частный случай broadcast, хранит только последнее значение (промежуточные состояния могут быть пропущены)

По ёмкости буфера:

  • Unbounded (неограниченный) - буфер растёт по мере необходимости. Отправка никогда не блокирует, но может привести к неконтролируемому росту памяти

  • Bounded (ограниченный) - буфер фиксированного размера. Когда буфер полон, отправитель блокируется или получает ошибку

  • Rendezvous - буфер нулевого размера. Отправитель и получатель должны встретиться одновременно

По модели исполнения:

  • Синхронные - блокируют текущий поток: на приёме пока нет данных, а в ограниченном-канале ещё и на отправке, пока буфер полон

  • Асинхронные - интегрируются с async-рантаймом и возвращают объект Future при отправке/получении

Все эти классификации независимы и комбинируются в зависимости от реализации.

std::sync::mpsc

Это канал, доступный в стандартной библиотеке. Его название говорит само за себя: mpsc - multi-producer, single-consumer

Модуль предоставляет 3 типа:

  • Sender - отправитель для неограниченного канала

  • SyncSender - отправитель для ограниченного канала

  • Receiver - получатель, общий для обоих видов отправителей

Способы создания каналов:

use std::sync::mpsc;

// Неограниченный-канал
let (tx, rx) = mpsc::channel();

// Ограниченный-канал, буфер на 16 элементов
let (tx, rx) = mpsc::sync_channel(16); 

// Rendezvous-канал
let (tx, rx) = mpsc::sync_channel(0); 

mpsc::channel() - создаёт неограниченный канал. Вызов tx.send(value) никогда не блокирует - значение помещается в растущий буфер. Это удобно, но если получатель не успевает, память будет расти без предела.

mpsc::sync_channel(n) - создаёт канал с буфером размера n. Если буфер полон, tx.send() заблокирует поток до тех пор, пока получатель не заберёт хотя бы одно значение. При n = 0 получаем "rendezvous" канал (каждый вызов send() блокирует текущий поток, пока другой поток не прочитает сообщение).

Пример

use std::sync::mpsc;
use std::thread;

let (tx, rx) = mpsc::channel();     // Создание канала
thread::spawn(move || {             // Создание нового потока
        tx.send(10).unwrap();       // Отправка сообщения
});
assert_eq!(rx.recv()?, 10)          // Получаем значение и сравниваем
use std::sync::mpsc;
use std::thread;

let (tx, rx) = mpsc::channel(); // tx - сокращённо от transmitter (передатчик), а rx от receiver (приёмник)

for i in 0..3 {
    let tx = tx.clone();        // клонируем отправитель
    thread::spawn(move || {
        tx.send(i).unwrap();
    });
}
drop(tx);                      // вручную дропаем отправитель

while let Ok(received) = rx.recv() {
    println!("получено: {received}");
}

Примечание:

  • пока жив хоть один Sender, recv будет ждать. В первом примере отправитель перемещается в поток и дропается при завершении его работы. Во втором примере клоны дропаются в своих потоках, а исходный Sender приходится дропнуть вручную, т.к. иначе recv будет ждать вечно и основной поток не завершится

  • Sender и SyncSender реализуют трейт Clone, поэтому их можно клонировать (multiple producers), а вот Receiver клонировать нельзя - получатель всегда один (single-consumer)

  • send() и recv() возвращают Result. Err означает, что другой конец канала уже закрыт: для send() - уничтожен Receiver, для recv() - уничтожены все Sender. При этом send() возвращает неотправленное значение обратно внутри SendError, чтобы оно не потерялось

  • Receiver реализует трейт IntoIterator, так что вместо while let Ok(received) = rx.recv() можно было написать for received in rx

  • тонкость с потокобезопасностью: Sender и SyncSender реализует трейт-маркеры Send и Sync, а вот Receiver тоже Send, но !Sync (читать из канала можно только из одного потока)

Receiver поддерживает несколько режимов чтения:

// Блокирующее чтение - ждёт сообщения
let val = rx.recv()?;

// Неблокирующее - возвращает Err, если канал пуст
match rx.try_recv() {
    Ok(val) => println!("Получили: {}", val),
    Err(mpsc::TryRecvError::Empty) => println!("Пока пусто"),
    Err(mpsc::TryRecvError::Disconnected) => println!("Канал закрыт"),
}

// С таймаутом
match rx.recv_timeout(Duration::from_secs(5)) {
    Ok(val) => println!("Получили: {}", val),
    Err(mpsc::RecvTimeoutError::Timeout) => println!("Не дождались"),
    Err(mpsc::RecvTimeoutError::Disconnected) => println!("Канал закрыт"),
}

У SyncSender есть неблокирующая отправка - try_send(), которая не ждёт освобождения буфера, а сразу возвращает управление:

match tx.try_send(value) {
    Ok(()) => println!("отправлено"),
    Err(mpsc::TrySendError::Full(value)) => println!("буфер полон, значение вернулось: {value}"),
    Err(mpsc::TrySendError::Disconnected(value)) => println!("канал закрыт"),
}

Этот тип каналов стоит брать для простых синхронных сценариев, когда несколько потоков пишут - один читает.

crossbeam-channel

Крейт crossbeam-channel - альтернатива std::sync::mpsc, которая предоставляет mpmc-каналы и некоторые дополнительные возможности. Его реализацию портировали в стандартную библиотеку при переписывании mpsc в Rust 1.67, так что для простого mpsc-случая std теперь по скорости ему не уступает, и главный довод за crossbeam именно фичи, которых нет в std.

Способы создания каналов:

use crossbeam_channel::{unbounded, bounded, select};

// Неограниченный-канал
let (tx, rx) = unbounded();

// Ограниченный-канал
let (tx, rx) = bounded(100);

// Rendezvous-канал
let (tx, rx) = bounded(0);

Пример

use crossbeam_channel::unbounded;
use std::thread;

let (s, r) = unbounded();

// несколько отправителей
for i in 0..6 {
    let s = s.clone();
    thread::spawn(move || s.send(i).unwrap());
}
drop(s);  // дропаем исходный отправитель, иначе получатели будут ждать вечно
 
let mut handles = Vec::new();
for id in 0..2 {
    let r = r.clone();    // теперь получателей тоже можно клонировать
    handles.push(thread::spawn(move || {
        for msg in r {
            println!("получатель {id}: {msg}");
        }
    }));
}

for h in handles {
    h.join()?;
}
  • И Sender, и Receiver реализуют Clone (multiple producers, multiple consumers)

  • Каждое сообщение достаётся ровно одному получателю - это очередь с конкуренцией за элементы

  • Канал закрывается, когда уничтожены все отправители или все получатели. recv() вернёт Err(RecvError), когда исчезнут все Sender'ы; send() вернёт Err(SendError) (с возвратом значения внутри), когда исчезнут все Receiver'ы.

  • Обе половины - Send + Sync, так что их можно и перемещать в другие потоки, и шарить по ссылке.

Помимо обычных каналов, crossbeam предоставляет несколько специальнов каналов помощников:

  • after(duration) - через заданное время срабатывает один раз

  • tick(duration) - периодически выдаёт сообщения через указанный интервал

  • never() - канал, который никогда не становится готовым к чтению (удобно как заглушка в select!)

use crossbeam_channel::{after, tick, never, Receiver};
use std::time::Duration;

// Одноразовый таймер - пришлёт одно значение через 1 секунду
let timeout = after(Duration::from_secs(1));

// Периодический - присылает значение каждые 200 мс
let ticker = tick(Duration::from_millis(200));

// never: канал, по которому ничего не придёт
let maybe_rx: Option<Receiver<i32>> = None;
let opt_rx = maybe_rx.unwrap_or_else(never);

Также крейт crossbeam-channel предоставляет макрос select!, аналог Go-шного select: он позволяет ждать сразу несколько каналов и реагировать на тот, что готов первым.

use crossbeam_channel::{after, bounded, select, tick};
use std::{thread, time::Duration};

let (s1, r1) = bounded::<i32>(1);
let (s2, r2) = bounded::<i32>(1);

// Отправляет число каждые 250 мс
thread::spawn(move || {
    for i in 0.. {
        if s1.send(i).is_err() {
            break;
        }
        thread::sleep(Duration::from_millis(250));
    }
});

// Отправляет число каждые 400 мс
thread::spawn(move || {
    for i in 0.. {
        if s2.send(i).is_err() {
            break;
            }
        thread::sleep(Duration::from_millis(400));
    }
});

// Таймеры
let ticker = tick(Duration::from_millis(100));
let timeout = after(Duration::from_secs(2));

loop {
	// select! опрашивает все ветки и выбирает ту, которая готова. Если готовы несколько - выбирает случайную
    select! {
        recv(r1) -> msg => println!("канал 1: {:?}", msg),
        recv(r2) -> msg => println!("канал 2: {:?}", msg),
        recv(ticker) -> _ => println!("тик"),
        recv(timeout) -> _ => {
            println!("таймаут!");
            break;
        }
    }
}

У crossbeam нет поддержки async: все его операции блокирующие, и в async-коде он сам по себе неуместен (хотя его иногда используют внутри выделенного пула блокирующих потоков). Если нужен async - смотрите в сторону tokio::sync или flume.

Этот тип каналов стоит брать для многопоточного синхронного кода, где нужны MPMC каналы.

tokio::sync

Если вы пишете async-код на рантайме tokio, вам понадобятся каналы, которые умеют в .await вместо блокировки потока. tokio::sync предоставляет не один канал, а целый набор.

  1. tokio::sync::mpsc - async-аналог std::sync::mpsc

use tokio::sync::mpsc;


let (tx, mut rx) = mpsc::channel(32); // буфер на 32

tokio::spawn(async move {
    for i in 0..5 {
        tx.send(i).await.unwrap(); // .await, а не блокировка
    }
});

while let Some(v) = rx.recv().await {
    println!("получено: {v}");
}

tx.send().await при полном буфере приостановит текущую задачу, а не поток. Когда все Sender дропнуты и буфер вычитан, rx.recv().await возвращает None, и цикл завершается.


2. tokio::sync::oneshot - канал для одного-единственного значения (single-producer, single-consumer)

use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel();

tokio::spawn(async move {
    if let Err(_) = tx.send(3) {
        println!("the receiver dropped");
    }
});

match rx.await {
    Ok(v) => println!("got = {:?}", v),
    Err(_) => println!("the sender dropped"),
}

Примечание:

  • метод send не асинхронный, его можно вызывать где угодно. В том числе отправлять между двумя рантаймами и использовать из не-async-кода

  • если Receiver закрыт до приёма уже отправленного сообщения, сообщение остаётся в канале, пока получатель не будет дропнут

  • в tokio::select! oneshot используют через &mut rx (чтобы не забирать rx по значению в цикле)

    3.tokio::sync::broadcast - несколько отправителей, каждый получатель видит каждое сообщение

use tokio::sync::broadcast;

let (tx, _) = broadcast::channel(16);

// подписаться нужно до send - получатель видит только сообщения,
// отправленные после его subscribe()
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();

tx.send("всем привет")?;

assert_eq!(rx1.recv().await?, "всем привет");
assert_eq!(rx2.recv().await?, "всем привет");

Примечания:

  • T: Clone - каждому получателю достаётся клон значения, для больших структур заворачивайте в Arc

  • Если получатель отстаёт и буфер переполняется, он теряет самые старые сообщения, а его recv() вернёт Err(RecvError::Lagged(n)) (n - сколько пропущено), после чего чтение продолжится с самого старого доступного

    4.tokio::sync::watch - multi-producer, multi-consumer канал, который сохраняет последнее отправленное значение (промежуточные значения могут быть пропущены)

use tokio::sync::watch;
use tokio::time::{Duration, sleep};

let (tx, mut rx) = watch::channel("hello");

tokio::spawn(async move {
    loop {
	    // сначала обрабатываем текущее значение, потом ждём changed()
        println!("{}! ", *rx.borrow_and_update());
        if rx.changed().await.is_err() {
            break; // все отправители дропнуты
        }
    }
});

sleep(Duration::from_millis(100)).await;
tx.send("world")?;

Чтение текущего значение:

  • borrow() - ссылка на последнее значение, не помечает его увиденным. Удобен, когда нужно просто посмотреть текущее состояние

  • borrow_and_update() - то же самое, но помечает значение увиденным. В цикле с changed() предпочитай именно его, т.к. с borrow() возможна гонка, когда новое значение приходит между готовностью changed() и чтением, и цикл проработает дважды с одним значением

Ожидание изменений:

  • changed() - асинхронно ждёт нового значения и помечает его увиденным. Возвращает Err только когда канал закрыт и текущее значение уже увидено. Не сравнивает на равенство - сработает, даже если новое значение совпало со старым

  • has_changed() - синхронно проверяет, есть ли непросмотренное значение, не помечая его. Вернёт Err как только канал закрыт. - Тоже не сравнивает на равенство

  • wait_for(pred) - асинхронно ждёт, пока значение не удовлетворит предикату, возвращает ссылку на него и помечает увиденным

Способы отправки сообщения:

  • send(value) - заменить значение. Возвращает Err, если получателей не осталось

  • send_replace(value) - заменить значение с возвратом старого

  • send_modify(|&mut value| ...) - изменить значение на месте, без аллокации нового

  • send_if_modified - изменить условно: уведомляет получателей, только если замыкание вернуло true

  • subscribe() - создать нового получателя;

  • receiver_count() - получить число получателей

  • is_closed() - проверить закрыт ли канад

  • closed() - ожидание закрытия канала

  • borrow() - отправитель тоже может прочитать текущее значение

    5.tokio::sync::Notify - не совсем канал, но близкий примитив - сигнал без данных

use tokio::sync::Notify;
use std::sync::Arc;

let notify = Arc::new(Notify::new());
let notify2 = notify.clone();

tokio::spawn(async move {
    notify2.notified().await;
    println!("получили сигнал!");
});

notify.notify_one(); // Разбудить одну задачу
// или
notify.notify_waiters(); // Разбудить все ожидающие

Примечание:

  • notify_one() - будит одну ждущую задачу. Если в этот момент никто не ждёт, сохраняется одно разрешение (permit), и следующий notified().await пройдёт сразу. Permit хранится не больше одного: несколько notify_one() подряд = один permit (следующий notified() пройдёт мгновенно, а тот, что за ним, - будет ждать). То есть Notify - не счётчик

  • notify_waiters() - будит всех, кто уже ждёт, но permit не сохраняет: если в этот момент никто не ждёт, сигнал теряется

Tokio предоставляет свой макрос select!, который ждёт несколько async-операций одновременно и выполняет ветку той, что готова первой. Остальные ветки при этом отменяются. Если готовы сразу несколько - выбирается случайная

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};


let (tx, mut rx) = mpsc::channel(8);

tokio::spawn(async move {
    for i in 0..3 {
        tx.send(i).await.unwrap();
        sleep(Duration::from_millis(50)).await;
    }
    // tx дропается - канал закроется
});

loop {
    tokio::select! {
        // что готово первым - то и сработает
        maybe = rx.recv() => match maybe 
                Some(v) => println!("получено: {v}"),
                None => { println!("канал закрыт"); break; },
        _ = sleep(Duration::from_millis(200)) => {
            println!("200 мс тишины - выходим");
            break;
        }
    }
}

Примечание:

  • выполняется только первая готовая ветка - остальные отменяются, поэтому в ветки кладут только cancel-safe операции (mpsc::recv, Notify::notified безопасны, а операции с частичным прогрессом - нет, иначе данные будут потеряны)

  • в примере sleep пересоздаётся на каждой итерации, поэтому это таймаут бездействия - он сбрасывается при каждом сообщении

Чем tokio::select! отличается от select! в crossbeam:

  • Crossbeam select! синхронный - он блокирует поток, пока одна из операций не станет готова. Tokio select! асинхронный - он уступает задачу планировщику, не занимая поток. Отсюда правило, что crossbeam-select неуместен в async (заблокирует executor), а tokio-select требует рантайма и не работает в обычном sync-коде

  • В crossbeam ветки - это только операции с каналами (recv/send) плюс таймеры-каналы after/tick. В tokio ветка - любой Future: recv канала, sleep, сетевой I/O, любая async-операция

  • В tokio future проигравших веток дропаются на точке await. Если операция вела частичный прогресс, он теряется, поэтому важна cancel-safety. В crossbeam терять нечего, выбирается одна готовая операция, а невыбранные просто не выполняются (сообщение не считывается, если его ветка не выбрана) - никакого частичного прогресса и связанного с ним класса багов. crossbeam select! - для синхронного многопоточного кода (блокирует поток, только каналы, прогресс не теряется), tokio select! - для async (уступает задачу, любые future, но следи за cancel-safety).

Также стоит отдельно рассказать про cancel safety (безопасность отмены). В async объест Future может быть отменён, то есть дропнут до завершения. Это происходит не только при abort() задачи, но и штатно. Например, в tokio::select! дропаются future всех проигравших веток, внутри timeout(...) future дропается по истечении времени и т.д.

Операция cancel-safe, если её отмена на полпути ничего не теряет: данные остаются на месте и операцию можно спокойно вызвать заново. Операция не cancel-safe, если при отмене теряется частичный прогресс.

Cancel-safe:

  • mpsc::Receiver::recv/UnboundedReceiver::recv - при отмене сообщение остаётся в канале

  • mpsc::Sender::send - при отмене гарантируется, что сообщение не было отправлено

  • broadcast::Receiver::recv, watch::Receiver::changed, Notify::notified

  • oneshot через &mut rx.

Не cancel-safe:

  • AsyncReadExt::read_exact, AsyncBufReadExt::read_line и подобные - при отмене уже прочитанные байты пропадают

  • в общем случае - всё, что копит частичное состояние внутри самой future

Почему это важно: в цикле select! дропает future проигравших веток на каждой итерации. Положить туда не-cancel-safe операцию - её частичный прогресс будет теряться каждый раз. Решение - создать такую future один раз вне цикла, запиннить и обращаться к ней по &mut, чтобы состояние пережило итерации:

let op = read_one_frame(&mut socket); // future с внутренним состоянием
tokio::pin!(op);
loop {
    tokio::select! {
        res = &mut op => { /* ... */ break; } // переживёт отмену других веток
        _ = stop_rx.changed() => break,
    }
}

Практическое правило: прежде чем класть async-метод в select!, загляни в его документацию - у методов tokio есть раздел "Cancel safety", где это прямо указано.

Мост в sync - если к tokio-каналу нужно обращаться из синхронного кода, у mpsc есть блокирующие версии - blocking_send() и blocking_recv(). Они блокируют поток вместо .await, так что обычный поток может слать и читать через тот же канал. Важно, что эти методы паникуют, если вызвать их внутри async-контекста (внутри рантайма). А если, наоборот, нужно выполнить блокирующий вызов из самой async-задачи, заверните его в tokio::task::spawn_blocking.

use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel::<u8>(10);

let sync_code = thread::spawn(move || {
    assert_eq!(Some(10), rx.blocking_recv());
});

Runtime::new()
    .unwrap()
    .block_on(async move {
        let _ = tx.send(10).await;
    });
sync_code.join().unwrap()

И ремарка на случай. Если вы используете не tokio рантайм, а наример async-std или smol, то стоит взять крейт async-channel. Он предоставляет асинхронный mpmc-канал с тем же базовым API: send().await/recv().await, Receiver реализует Stream, есть send_blocking()/recv_blocking() для моста в sync-код, и обе половины клонируются. По сути то же, что tokio::sync::mpsc, но без привязки к конкретному рантайму.

Этот набор асинхронных каналов - стандартный выбор для async-кода на рантайме tokio. Если пишите async вне tokio (async-std/smol), то смотрите в сторону async-channel.

flume

flume - высокопроизводительные mpmc каналы. Интересен тем, что один и тот же канал умеет работать и синхронно, и асинхронно. По API это почти полная замена std::sync::mpsc, но ещё с async, таймаутами и select-интерфейсом. Весь крейт написан без единой строчки unsafe.

Способы создания каналов:

let (tx, rx) = flume::unbounded(); // безразмерный
let (tx, rx) = flume::bounded(10); // ограниченный, буфер на 10
let (tx, rx) = flume::bounded(0);  // rendezvous (буфера нет)
let (tx, rx) = flume::bounded(10);

// Синхронный отправитель (обычный поток)
std::thread::spawn(move || {
    tx.send(42).unwrap();
});

// Асинхронный получатель (tokio-задача)
tokio::spawn(async move {
    let val = rx.recv_async().await.unwrap();
    println!("{}", val);
});

Примечания:

  • Sender и Receiver реализуют Send + Sync + Clone

  • поддержка таймаутов/дедлайнов на отправку и приём (send_timeout/send_deadline, recv_timeout/recv_deadline)

  • помимо send_async/recv_async, есть rx.stream()/rx.into_stream() и tx.sink()/tx.into_sink() - для интеграции с комбинаторами futures

  • мало зависимостей, минимальная кодовая база, быстрая компиляция

  • без unsafe-кода, мало зависимостей, быстро компилируется

Также крейт предоставляет select-подобный интерфейс:

let (tx0, rx0) = flume::unbounded();
let (tx1, rx1) = flume::unbounded();

std::thread::spawn(move || {
    tx0.send(true).unwrap();
    tx1.send(42).unwrap();
});

// реагируем на тот канал, что готов первым
flume::Selector::new()
    .recv(&rx0, |b| println!("Received {:?}", b))
    .recv(&rx1, |n| println!("Received {:?}", n))
    .wait();

Хорошо подходит, когда нужен один канал, работающий и в sync, и в async частях кода, без необходимости тащить разные крейты для разных половин приложения.

kanal

kanal - крейт каналов, вдохновлённый моделью CSP (Communicating Sequential Processes - взаимодействующие последовательные процессы), с главным упором на очень высокую производительность. Предоставляет mpmc и spsc каналы. Как и во flume, объединяет sync и async на одном канале. Цена за скорость - внутри много unsafe-кода, и крейт ещё не дошёл до стабильной версии 1.0.

Kanal применяет сильно оптимизированную составную технику передачи объектов. Когда размер данных меньше или равен размеру указателя, используется сериализация: данные кодируются как адрес указателя. Если же размер данных превышает размер указателя, протокол применяет стратегию, похожую на ту, что используется в языке Go - прямой доступ к памяти для копирования объектов со стека отправителя или записи напрямую в стек получателя. Этот составной метод не только устраняет лишние обращения по указателю, но и убирает аллокации в куче для каналов bounded(0).

Кроме того, реализация использует специально настроенный мьютекс для блокировки канала - это стало возможным благодаря предсказуемому времени внутренней блокировки канала. При этом можно задействовать стандартный мьютекс из std через фичу std-mutex.

Создание каналов

let (s, r) = kanal::bounded(8);        // синхронный ограниченный
let (s, r) = kanal::unbounded();       // синхронный безразмерный
let (s, r) = kanal::bounded_async(8);  // асинхронный ограниченный
let (s, r) = kanal::unbounded_async(); // асинхронный безразмерный
let (s, r) = kanal::bounded(0);        // rendezvous (нулевой буфер)

Sync и async конструкторы дают разные типы (Sender/Receiver против AsyncSender/AsyncReceiver), но любой из них конвертируется в другой режим.

Пример синхронного канала с мостом в async

// Создаём ограниченный синхронный канал с буфером на 8 сообщений
let (sender, receiver) = kanal::bounded(8);

let s = sender.clone();
std::thread::spawn(move || {
    s.send("hello").unwrap();
});

// Получаем сообщение в другом потоке
let msg = receiver.recv()?;
println!("I got msg: {}", msg);


// Преобразуем канал и используем его в async-контексте для связи между sync и async
tokio::spawn(async move {
    // Берём канал как асинхронный и используем в async-контексте
    // (или конвертируем в async через to_async())
    sender.as_async().send("hello").await?;
});

Пример асинхронного канала с мостом в sync

// Создаём ограниченный async-канал с буфером на 8 сообщений
let (sender, receiver) = kanal::bounded_async(8);

sender.send("hello").await?;
sender.send("hello").await?;

// Клонируем получателя и конвертируем его в синхронного
let receiver_sync = receiver.clone().to_sync();

tokio::spawn(async move {
    let msg = receiver.recv().await.unwrap();
    println!("I got msg: {}", msg);
});

// Создаём поток и используем получателя в sync-контексте
std::thread::spawn(move || {
    let msg = receiver_sync.recv().unwrap();
    println!("I got msg in sync context: {}", msg);
});

Примечание:

  • мост между sync и async:

    • as_async()/as_sync() - берут другой вид канала по ссылке

    • to_async()/to_sync() - конвертируют, потребляя исходный канал

    • clone_async()/clone_sync() - создают копию в другом режиме

  • подобно Go, kanal позволяет закрывать каналы функцией close(): можно послать сигнал закрытия из любого экземпляра канала и закрыть его сразу и для отправителей, и для получателей. Состояние проверяется через is_closed() (закрыт ли канал) и is_disconnected() (отвалилась ли отправляющая сторона)

  • поддержка таймаутов (recv_timeout()/send_timeout())

  • AsyncReceiver умеет отдавать сообщения как Stream, что подключает его к комбинаторам futures

Стоит брать, когда канал горячее место и нужна максимальная пропускная способность с sync/async API

Сравнение возможностей разных реализаций каналов

Крейт

Sync

Async

Тип каналов

Особенности

std::sync::mpsc

да

нет

mpsc

есть в стандартной библиотеке

crossbeam-channel

да

нет

mpmc

таймеры after/tick/never
макрос select

tokio::sync

через blocking_*

да

mpsc
spsc
broadcast
watch

семейство каналов под разные задачи
стандарт для async на tokio
макрос select

flume

да

да

mpmc

без unsafe в исходном коде
предоставляет мостик между sync и async
select-подобный интерфейс

async-channel

через *_blocking

да

mpmc

для экосистемы async-std/smol

kanal

да

да

mpscspsc

высокая производительность
предоставляет мостик между sync и async
много unsafe в исходниках
ещё нет мажорной 1.0

Диаграмма выобра канала
Диаграмма выобра канала

Паттерны использования

Fan-out / Fan-in

Паттерн параллельной обработки данных. Один канал раздаёт задачи (fan-out), другой собирает результаты (fan-in):

use crossbeam_channel::{bounded, unbounded};
use std::thread;

fn main() {
    let tasks = vec![2, 4, 6, 8, 10];
    let num_workers = 3;

    let (task_tx, task_rx) = bounded(100);    // Раздача задач
    let (result_tx, result_rx) = unbounded(); // Сбор результатов

    let mut final_results = Vec::new();

    // // Fan-out: воркеры тянут задачи из общего канала
    for _ in 0..num_workers {
        let task_rx = task_rx.clone();
        let result_tx = result_tx.clone();
        thread::spawn(move || {
            for task in task_rx {
                let result = task * 2;
                result_tx.send(result).unwrap();
            }
        });
    }

    // Отправляем задачи
    for task in tasks {
        task_tx.send(task).unwrap();
    }


    // Закрываем отправители
    drop(task_tx);
    drop(result_tx);

    // Fan-in: собираем результаты паралельно с воркерами
    // Цикл закончится, когда все воркеры завершатся и клоны result_tx закроются
    for result in result_rx {
        final_results.push(result);
    }

    println!("Результаты: {:?}", final_results);
}

Actor model

Отдельная задача/поток единолично владеет своим состоянием и взаимодействует с другими акторами только через обмен сообщениями. Сообщения обрабатываются последовательно, поэтому состояние не разделяется между потоками и обычно не требует дополнительной синхронизации (Mutex, RwLock и т.д.)

use std::collections::HashMap;
use tokio::{
    sync::mpsc,
    task::JoinHandle
};

// Сообщение
enum Cmd {
    Insert(String, i64),
    Increment(String),
}


// Актор
async fn counter_actor(mut rx: mpsc::Receiver<Cmd>) {
    // Локальное состояние актора
    let mut state = HashMap::new();

    // Обрабатываем сообщения
    while let Some(cmd) = rx.recv().await {
        match cmd {
            Cmd::Insert(k, v) => {
                state.insert(k, v);
            }

            Cmd::Increment(k) => {
                *state.entry(k).or_default() += 1;
            }
        }
    }

    println!("Итоговое состояние: {state:?}");
}

// Единственная точка общения с актором
#[derive(Clone)]
struct CounterHandle {
    tx: mpsc::Sender<Cmd>,
}

impl CounterHandle {
    fn spawn() -> (Self, JoinHandle<()>) {
        let (tx, rx) = mpsc::channel(32); // почтовый ящик на 32 сообщения 
        let join = tokio::spawn(counter_actor(rx));
        (Self { tx }, join)
    }

    async fn insert(&self, key: &str, value: i64) {
        self.tx.send(Cmd::Insert(key.into(), value)).await.unwrap();
    }

    async fn increment(&self, key: &str) {
        self.tx.send(Cmd::Increment(key.into())).await.unwrap();
    }
}

#[tokio::main]
async fn main() {
    let (counter, join) = CounterHandle::spawn();

    // Отправляем команды
    counter.insert("hits", 0).await;
    counter.increment("hits").await;
    counter.increment("hits").await;

    drop(counter);        // закрываем отправитель, чтобы получатель закончил работу
    join.await.unwrap();  // дожидаемся пока актор обработает всё и завершится
}

Graceful shutdown

Завершение работы приложения, когда система сначала прекращает принимать новые данные и корректно завершает уже выполняющиеся операции, освобождает ресурсы и только после этого останавливается

use tokio::{
    select,
    sync::{mpsc, watch},
};

#[tokio::main]
async fn main() {
    let (work_tx, mut work_rx) = mpsc::channel::<u32>(100);
    let (stop_tx, mut stop_rx) = watch::channel(false);

    let worker = tokio::spawn(async move {
        loop {
            select! {
                job = work_rx.recv() => {
                    match job {
                        Some(j) => println!("обработка {j}"),
                        None => break // рабочий канал закрыт

                    }
                }

                _ = stop_rx.changed() => {
                    if *stop_rx.borrow() {
                        // дочищаем очередь
                        while let Ok(j) = work_rx.try_recv() {
                            println!("дообработка {j}");
                        }

                        println!("остановлено по сигналу");
                        break;
                    }
                }
            }
        }
    });

    for n in 0..100 {
        work_tx.send(n).await.unwrap();
    }
    stop_tx.send(true).unwrap();
    worker.await.unwrap();
}

P.s. этот пример более акадимический, в реальном проекте нужно было бы использовать CancellationToken из tokio-util

Pipeline

Цепочка последовательных этапов, соединённых каналами и работающих параллельно: каждый этап преобразует элемент и передаёт дальше.

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx1, rx1) = mpsc::sync_channel(16);
    let (tx2, rx2) = mpsc::sync_channel(16);

    // Этап 1: возведение в квадрат
    let squarer = thread::spawn(move || {
        for n in rx1 {
            if tx2.send(n * n).is_err() {
                break;
            }
        }
    });

    // Этап 2: вывод результата
    let printer = thread::spawn(move || {
        for n in rx2 {
            println!("Result: {n}");
        }
    });

    for n in 1..=5 {
        tx1.send(n).unwrap();
    }
    drop(tx1);

    squarer.join().unwrap();
    printer.join().unwrap();
}

Ещё раз про возможные подводные камни

  • Deadlock при ограниченных-каналах - если два потока отправляют друг другу через ограниченные-каналы и оба буфера полны, то это взаимная блокировка

  • Утечка памяти при неограниченных-каналах - если отправитель быстрее получателя, буфер растёт без ограничений

  • Блокировка async-рантайма - не вызывайте без необходимости синхронный .send() или .recv() из async-контекста, т.к. это заблокирует весь поток, используйте spawn_blocking для синхронных каналов или переходите на async-варианты

  • Drop-семантика - когда все отправители дропаются, получатель узнаёт об этом. Но если вы случайно сохраните лишний клон отправителя, то канал никогда не закроется, и получатель зависнет

  • Механизм cooperative budget в tokio - каждой задаче tokio выдаёт бюджет. По умолчанию 128 операций на один проход планировщика, который сбрасывается, когда планировщик переключается на задачу. Когда бюджет доходит до нуля, операция возвращает Poll::Pending, заставляя задачу уступить. Операции с каналами этот бюджет тратят: при вызове recv у tokio::sync::mpsc бюджет автоматически расходуется на каждое возвращённое значение - так задачи с tokio-каналами становятся кооперативными. В 99% случаев это благо: горячий цикл loop { rx.recv().await } при всегда готовом канале не заберёт всё процессорное время потока. Но есть подводный камень - асинхронные каналы из flume или kanal этот механизм не учитывают, поэтому в некоторых ситуациях это потенциально может стать узким горлышком. Для избежания таких ситуаций можно вызывать вручную tokio::task::consume_budget().await или tokio::task::yield_now().await

Когда каналы не подходят

Каналы - это мощный инструмент, но не универсальный молоток. Прежде чем заводить Sender/Receiver стоит разобраться: это передача данных или просто общий доступ к ним?

  • Нужно разделяемое изменяемое состояние (общий счётчик, конфиг, кэш, к которому ходят много потоков) - это работа для Arc<Mutex<…>> или Arc<RwLock<…>>, а не для канала

  • Простой числовой счётчик или флаг - для этого хватит AtomicUsize/AtomicBool без локов и каналов

  • Разбудить, когда поменялось общее значение - это tokio::sync::watch или tokio::sync::Notify, а не очередь сообщений

  • Одна задача ждёт результат другой - часто достаточно просто handle.await (или join), а не oneshot-канал

Заключение

"Какой канал использовать" - вопрос не только о том, какой быстрее в синтетическом бенчмарке, сколько о том, какая у вас модель исполнения (sync или async) или по нужному количеству отправителей/получателей. Для большинства задач достаточно трёх инструментов: std::sync::mpsc для простого sync, tokio::sync для async и flume, когда нужен мостик между sync и async. Начните с базовой тройки и тянитесь к остальным только тогда, когда упрётесь в их ограничения.