Каналы - механизм передачи данных между потоками или асинхронными задачами. Идея простая: один конец отправляет сообщения (отправитель), другой их получает (получатель), а вся синхронизация спрятана внутри. За этим стоит целый подход: вместо того чтобы разделять память и вручную её блокировать, данные передают сообщением. В Rust это ложится прямо на модель владения, ведь значение передаётся от отправителя к получателю без алиасинга и гонок данных. Но не везде каналы первоклассный инструмент, например в стандартной библиотеке C++ их нет вовсе, в Python есть очередь-аналог (queue.Queue), а идиомой этот стиль сделал Go.
На практике же стоит копнуть глубже, и оказывается, что каналов в экосистеме Rust целый зоопарк: std::sync::mpsc, crossbeam-channel, tokio::sync, flume, async-channel, kanal и ещё несколько менее известных. Они отличаются моделью, поддержкой асинхронности, скоростью и кучей мелких деталей, о которые легко споткнуться. В этой статье разберёмся, какие каналы бывают, чем они отличаются и какой брать под конкретную задачу.
Классификация каналов
Прежде чем нырять в конкретные реализации, нужно обозначить ключевые характеристики, по которым каналы различаются.
По количеству отправителей и получателей:
SPSC (single producer, single consumer) - ровно один отправитель и один получатель
MPSC (multiple producers, single consumer) - много отправителей, один получатель
MPMC (multiple producers, multiple consumers) - много отправителей и получателей
Broadcast - много получателей, каждый получает копию каждого сообщения
Watch - частный случай broadcast, хранит только последнее значение (промежуточные состояния могут быть пропущены)
По ёмкости буфера:
Unbounded (неограниченный) - буфер растёт по мере необходимости. Отправка никогда не блокирует, но может привести к неконтролируемому росту памяти
Bounded (ограниченный) - буфер фиксированного размера. Когда буфер полон, отправитель блокируется или получает ошибку
Rendezvous - буфер нулевого размера. Отправитель и получатель должны встретиться одновременно
По модели исполнения:
Синхронные - блокируют текущий поток: на приёме пока нет данных, а в ограниченном-канале ещё и на отправке, пока буфер полон
Асинхронные - интегрируются с async-рантаймом и возвращают объект
Futureпри отправке/получении
Все эти классификации независимы и комбинируются в зависимости от реализации.
std::sync::mpsc
Это канал, доступный в стандартной библиотеке. Его название говорит само за себя: mpsc - multi-producer, single-consumer
Модуль предоставляет 3 типа:
Sender- отправитель для неограниченного каналаSyncSender- отправитель для ограниченного каналаReceiver- получатель, общий для обоих видов отправителей
Способы создания каналов:
use std::sync::mpsc; // Неограниченный-канал let (tx, rx) = mpsc::channel(); // Ограниченный-канал, буфер на 16 элементов let (tx, rx) = mpsc::sync_channel(16); // Rendezvous-канал let (tx, rx) = mpsc::sync_channel(0);
mpsc::channel() - создаёт неограниченный канал. Вызов tx.send(value) никогда не блокирует - значение помещается в растущий буфер. Это удобно, но если получатель не успевает, память будет расти без предела.
mpsc::sync_channel(n) - создаёт канал с буфером размера n. Если буфер полон, tx.send() заблокирует поток до тех пор, пока получатель не заберёт хотя бы одно значение. При n = 0 получаем "rendezvous" канал (каждый вызов send() блокирует текущий поток, пока другой поток не прочитает сообщение).
Пример
use std::sync::mpsc; use std::thread; let (tx, rx) = mpsc::channel(); // Создание канала thread::spawn(move || { // Создание нового потока tx.send(10).unwrap(); // Отправка сообщения }); assert_eq!(rx.recv()?, 10) // Получаем значение и сравниваем
use std::sync::mpsc; use std::thread; let (tx, rx) = mpsc::channel(); // tx - сокращённо от transmitter (передатчик), а rx от receiver (приёмник) for i in 0..3 { let tx = tx.clone(); // клонируем отправитель thread::spawn(move || { tx.send(i).unwrap(); }); } drop(tx); // вручную дропаем отправитель while let Ok(received) = rx.recv() { println!("получено: {received}"); }
Примечание:
пока жив хоть один
Sender,recvбудет ждать. В первом примере отправитель перемещается в поток и дропается при завершении его работы. Во втором примере клоны дропаются в своих потоках, а исходныйSenderприходится дропнуть вручную, т.к. иначеrecvбудет ждать вечно и основной поток не завершитсяSenderиSyncSenderреализуют трейтClone, поэтому их можно клонировать (multiple producers), а вотReceiverклонировать нельзя - получатель всегда один (single-consumer)send()иrecv()возвращаютResult.Errозначает, что другой конец канала уже закрыт: дляsend()- уничтоженReceiver, дляrecv()- уничтожены всеSender. При этомsend()возвращает неотправленное значение обратно внутриSendError, чтобы оно не потерялосьReceiverреализует трейтIntoIterator, так что вместоwhile let Ok(received) = rx.recv()можно было написатьfor received in rxтонкость с потокобезопасностью:
SenderиSyncSenderреализует трейт-маркерыSendиSync, а вотReceiverтожеSend, но!Sync(читать из канала можно только из одного потока)
Receiver поддерживает несколько режимов чтения:
// Блокирующее чтение - ждёт сообщения let val = rx.recv()?; // Неблокирующее - возвращает Err, если канал пуст match rx.try_recv() { Ok(val) => println!("Получили: {}", val), Err(mpsc::TryRecvError::Empty) => println!("Пока пусто"), Err(mpsc::TryRecvError::Disconnected) => println!("Канал закрыт"), } // С таймаутом match rx.recv_timeout(Duration::from_secs(5)) { Ok(val) => println!("Получили: {}", val), Err(mpsc::RecvTimeoutError::Timeout) => println!("Не дождались"), Err(mpsc::RecvTimeoutError::Disconnected) => println!("Канал закрыт"), }
У SyncSender есть неблокирующая отправка - try_send(), которая не ждёт освобождения буфера, а сразу возвращает управление:
match tx.try_send(value) { Ok(()) => println!("отправлено"), Err(mpsc::TrySendError::Full(value)) => println!("буфер полон, значение вернулось: {value}"), Err(mpsc::TrySendError::Disconnected(value)) => println!("канал закрыт"), }
Этот тип каналов стоит брать для простых синхронных сценариев, когда несколько потоков пишут - один читает.
crossbeam-channel
Крейт crossbeam-channel - альтернатива std::sync::mpsc, которая предоставляет mpmc-каналы и некоторые дополнительные возможности. Его реализацию портировали в стандартную библиотеку при переписывании mpsc в Rust 1.67, так что для простого mpsc-случая std теперь по скорости ему не уступает, и главный довод за crossbeam именно фичи, которых нет в std.
Способы создания каналов:
use crossbeam_channel::{unbounded, bounded, select}; // Неограниченный-канал let (tx, rx) = unbounded(); // Ограниченный-канал let (tx, rx) = bounded(100); // Rendezvous-канал let (tx, rx) = bounded(0);
Пример
use crossbeam_channel::unbounded; use std::thread; let (s, r) = unbounded(); // несколько отправителей for i in 0..6 { let s = s.clone(); thread::spawn(move || s.send(i).unwrap()); } drop(s); // дропаем исходный отправитель, иначе получатели будут ждать вечно let mut handles = Vec::new(); for id in 0..2 { let r = r.clone(); // теперь получателей тоже можно клонировать handles.push(thread::spawn(move || { for msg in r { println!("получатель {id}: {msg}"); } })); } for h in handles { h.join()?; }
И
Sender, иReceiverреализуютClone(multiple producers, multiple consumers)Каждое сообщение достаётся ровно одному получателю - это очередь с конкуренцией за элементы
Канал закрывается, когда уничтожены все отправители или все получатели.
recv()вернётErr(RecvError), когда исчезнут всеSender'ы;send()вернётErr(SendError)(с возвратом значения внутри), когда исчезнут всеReceiver'ы.Обе половины -
Send + Sync, так что их можно и перемещать в другие потоки, и шарить по ссылке.
Помимо обычных каналов, crossbeam предоставляет несколько специальнов каналов помощников:
after(duration)- через заданное время срабатывает один разtick(duration)- периодически выдаёт сообщения через указанный интервалnever()- канал, который никогда не становится готовым к чтению (удобно как заглушка вselect!)
use crossbeam_channel::{after, tick, never, Receiver}; use std::time::Duration; // Одноразовый таймер - пришлёт одно значение через 1 секунду let timeout = after(Duration::from_secs(1)); // Периодический - присылает значение каждые 200 мс let ticker = tick(Duration::from_millis(200)); // never: канал, по которому ничего не придёт let maybe_rx: Option<Receiver<i32>> = None; let opt_rx = maybe_rx.unwrap_or_else(never);
Также крейт crossbeam-channel предоставляет макрос select!, аналог Go-шного select: он позволяет ждать сразу несколько каналов и реагировать на тот, что готов первым.
use crossbeam_channel::{after, bounded, select, tick}; use std::{thread, time::Duration}; let (s1, r1) = bounded::<i32>(1); let (s2, r2) = bounded::<i32>(1); // Отправляет число каждые 250 мс thread::spawn(move || { for i in 0.. { if s1.send(i).is_err() { break; } thread::sleep(Duration::from_millis(250)); } }); // Отправляет число каждые 400 мс thread::spawn(move || { for i in 0.. { if s2.send(i).is_err() { break; } thread::sleep(Duration::from_millis(400)); } }); // Таймеры let ticker = tick(Duration::from_millis(100)); let timeout = after(Duration::from_secs(2)); loop { // select! опрашивает все ветки и выбирает ту, которая готова. Если готовы несколько - выбирает случайную select! { recv(r1) -> msg => println!("канал 1: {:?}", msg), recv(r2) -> msg => println!("канал 2: {:?}", msg), recv(ticker) -> _ => println!("тик"), recv(timeout) -> _ => { println!("таймаут!"); break; } } }
У crossbeam нет поддержки async: все его операции блокирующие, и в async-коде он сам по себе неуместен (хотя его иногда используют внутри выделенного пула блокирующих потоков). Если нужен async - смотрите в сторону tokio::sync или flume.
Этот тип каналов стоит брать для многопоточного синхронного кода, где нужны MPMC каналы.
tokio::sync
Если вы пишете async-код на рантайме tokio, вам понадобятся каналы, которые умеют в .await вместо блокировки потока. tokio::sync предоставляет не один канал, а целый набор.
tokio::sync::mpsc- async-аналогstd::sync::mpsc
use tokio::sync::mpsc; let (tx, mut rx) = mpsc::channel(32); // буфер на 32 tokio::spawn(async move { for i in 0..5 { tx.send(i).await.unwrap(); // .await, а не блокировка } }); while let Some(v) = rx.recv().await { println!("получено: {v}"); }
tx.send().await при полном буфере приостановит текущую задачу, а не поток. Когда все Sender дропнуты и буфер вычитан, rx.recv().await возвращает None, и цикл завершается.
2. tokio::sync::oneshot - канал для одного-единственного значения (single-producer, single-consumer)
use tokio::sync::oneshot; let (tx, rx) = oneshot::channel(); tokio::spawn(async move { if let Err(_) = tx.send(3) { println!("the receiver dropped"); } }); match rx.await { Ok(v) => println!("got = {:?}", v), Err(_) => println!("the sender dropped"), }
Примечание:
метод
sendне асинхронный, его можно вызывать где угодно. В том числе отправлять между двумя рантаймами и использовать из не-async-кодаесли
Receiverзакрыт до приёма уже отправленного сообщения, сообщение остаётся в канале, пока получатель не будет дропнутв
tokio::select!oneshotиспользуют через&mut rx(чтобы не забиратьrxпо значению в цикле)3.
tokio::sync::broadcast- несколько отправителей, каждый получатель видит каждое сообщение
use tokio::sync::broadcast; let (tx, _) = broadcast::channel(16); // подписаться нужно до send - получатель видит только сообщения, // отправленные после его subscribe() let mut rx1 = tx.subscribe(); let mut rx2 = tx.subscribe(); tx.send("всем привет")?; assert_eq!(rx1.recv().await?, "всем привет"); assert_eq!(rx2.recv().await?, "всем привет");
Примечания:
T: Clone- каждому получателю достаётся клон значения, для больших структур заворачивайте вArcЕсли получатель отстаёт и буфер переполняется, он теряет самые старые сообщения, а его
recv()вернётErr(RecvError::Lagged(n))(n - сколько пропущено), после чего чтение продолжится с самого старого доступного4.
tokio::sync::watch- multi-producer, multi-consumer канал, который сохраняет последнее отправленное значение (промежуточные значения могут быть пропущены)
use tokio::sync::watch; use tokio::time::{Duration, sleep}; let (tx, mut rx) = watch::channel("hello"); tokio::spawn(async move { loop { // сначала обрабатываем текущее значение, потом ждём changed() println!("{}! ", *rx.borrow_and_update()); if rx.changed().await.is_err() { break; // все отправители дропнуты } } }); sleep(Duration::from_millis(100)).await; tx.send("world")?;
Чтение текущего значение:
borrow()- ссылка на последнее значение, не помечает его увиденным. Удобен, когда нужно просто посмотреть текущее состояниеborrow_and_update()- то же самое, но помечает значение увиденным. В цикле сchanged()предпочитай именно его, т.к. сborrow()возможна гонка, когда новое значение приходит между готовностьюchanged()и чтением, и цикл проработает дважды с одним значением
Ожидание изменений:
changed()- асинхронно ждёт нового значения и помечает его увиденным. ВозвращаетErrтолько когда канал закрыт и текущее значение уже увидено. Не сравнивает на равенство - сработает, даже если новое значение совпало со старымhas_changed()- синхронно проверяет, есть ли непросмотренное значение, не помечая его. ВернётErrкак только канал закрыт. - Тоже не сравнивает на равенствоwait_for(pred)- асинхронно ждёт, пока значение не удовлетворит предикату, возвращает ссылку на него и помечает увиденным
Способы отправки сообщения:
send(value)- заменить значение. ВозвращаетErr, если получателей не осталосьsend_replace(value)- заменить значение с возвратом старогоsend_modify(|&mut value| ...)- изменить значение на месте, без аллокации новогоsend_if_modified- изменить условно: уведомляет получателей, только если замыкание вернулоtruesubscribe()- создать нового получателя;receiver_count()- получить число получателейis_closed()- проверить закрыт ли канадclosed()- ожидание закрытия каналаborrow()- отправитель тоже может прочитать текущее значение5.
tokio::sync::Notify- не совсем канал, но близкий примитив - сигнал без данных
use tokio::sync::Notify; use std::sync::Arc; let notify = Arc::new(Notify::new()); let notify2 = notify.clone(); tokio::spawn(async move { notify2.notified().await; println!("получили сигнал!"); }); notify.notify_one(); // Разбудить одну задачу // или notify.notify_waiters(); // Разбудить все ожидающие
Примечание:
notify_one()- будит одну ждущую задачу. Если в этот момент никто не ждёт, сохраняется одно разрешение (permit), и следующийnotified().awaitпройдёт сразу. Permit хранится не больше одного: несколькоnotify_one()подряд = один permit (следующийnotified()пройдёт мгновенно, а тот, что за ним, - будет ждать). То естьNotify- не счётчикnotify_waiters()- будит всех, кто уже ждёт, но permit не сохраняет: если в этот момент никто не ждёт, сигнал теряется
Tokio предоставляет свой макрос select!, который ждёт несколько async-операций одновременно и выполняет ветку той, что готова первой. Остальные ветки при этом отменяются. Если готовы сразу несколько - выбирается случайная
use tokio::sync::mpsc; use tokio::time::{sleep, Duration}; let (tx, mut rx) = mpsc::channel(8); tokio::spawn(async move { for i in 0..3 { tx.send(i).await.unwrap(); sleep(Duration::from_millis(50)).await; } // tx дропается - канал закроется }); loop { tokio::select! { // что готово первым - то и сработает maybe = rx.recv() => match maybe Some(v) => println!("получено: {v}"), None => { println!("канал закрыт"); break; }, _ = sleep(Duration::from_millis(200)) => { println!("200 мс тишины - выходим"); break; } } }
Примечание:
выполняется только первая готовая ветка - остальные отменяются, поэтому в ветки кладут только cancel-safe операции (
mpsc::recv,Notify::notifiedбезопасны, а операции с частичным прогрессом - нет, иначе данные будут потеряны)в примере
sleepпересоздаётся на каждой итерации, поэтому это таймаут бездействия - он сбрасывается при каждом сообщении
Чем tokio::select! отличается от select! в crossbeam:
Crossbeam
select!синхронный - он блокирует поток, пока одна из операций не станет готова. Tokioselect!асинхронный - он уступает задачу планировщику, не занимая поток. Отсюда правило, что crossbeam-select неуместен в async (заблокирует executor), а tokio-select требует рантайма и не работает в обычном sync-кодеВ crossbeam ветки - это только операции с каналами (
recv/send) плюс таймеры-каналыafter/tick. В tokio ветка - любойFuture:recvканала,sleep, сетевой I/O, любая async-операцияВ tokio future проигравших веток дропаются на точке
await. Если операция вела частичный прогресс, он теряется, поэтому важна cancel-safety. В crossbeam терять нечего, выбирается одна готовая операция, а невыбранные просто не выполняются (сообщение не считывается, если его ветка не выбрана) - никакого частичного прогресса и связанного с ним класса багов. crossbeamselect!- для синхронного многопоточного кода (блокирует поток, только каналы, прогресс не теряется), tokioselect!- для async (уступает задачу, любые future, но следи за cancel-safety).
Также стоит отдельно рассказать про cancel safety (безопасность отмены). В async объест Future может быть отменён, то есть дропнут до завершения. Это происходит не только при abort() задачи, но и штатно. Например, в tokio::select! дропаются future всех проигравших веток, внутри timeout(...) future дропается по истечении времени и т.д.
Операция cancel-safe, если её отмена на полпути ничего не теряет: данные остаются на месте и операцию можно спокойно вызвать заново. Операция не cancel-safe, если при отмене теряется частичный прогресс.
Cancel-safe:
mpsc::Receiver::recv/UnboundedReceiver::recv- при отмене сообщение остаётся в каналеmpsc::Sender::send- при отмене гарантируется, что сообщение не было отправленоbroadcast::Receiver::recv,watch::Receiver::changed,Notify::notifiedoneshotчерез&mut rx.
Не cancel-safe:
AsyncReadExt::read_exact,AsyncBufReadExt::read_lineи подобные - при отмене уже прочитанные байты пропадаютв общем случае - всё, что копит частичное состояние внутри самой future
Почему это важно: в цикле select! дропает future проигравших веток на каждой итерации. Положить туда не-cancel-safe операцию - её частичный прогресс будет теряться каждый раз. Решение - создать такую future один раз вне цикла, запиннить и обращаться к ней по &mut, чтобы состояние пережило итерации:
let op = read_one_frame(&mut socket); // future с внутренним состоянием tokio::pin!(op); loop { tokio::select! { res = &mut op => { /* ... */ break; } // переживёт отмену других веток _ = stop_rx.changed() => break, } }
Практическое правило: прежде чем класть async-метод в select!, загляни в его документацию - у методов tokio есть раздел "Cancel safety", где это прямо указано.
Мост в sync - если к tokio-каналу нужно обращаться из синхронного кода, у mpsc есть блокирующие версии - blocking_send() и blocking_recv(). Они блокируют поток вместо .await, так что обычный поток может слать и читать через тот же канал. Важно, что эти методы паникуют, если вызвать их внутри async-контекста (внутри рантайма). А если, наоборот, нужно выполнить блокирующий вызов из самой async-задачи, заверните его в tokio::task::spawn_blocking.
use std::thread; use tokio::runtime::Runtime; use tokio::sync::mpsc; let (tx, mut rx) = mpsc::channel::<u8>(10); let sync_code = thread::spawn(move || { assert_eq!(Some(10), rx.blocking_recv()); }); Runtime::new() .unwrap() .block_on(async move { let _ = tx.send(10).await; }); sync_code.join().unwrap()
И ремарка на случай. Если вы используете не tokio рантайм, а наример async-std или smol, то стоит взять крейт async-channel. Он предоставляет асинхронный mpmc-канал с тем же базовым API: send().await/recv().await, Receiver реализует Stream, есть send_blocking()/recv_blocking() для моста в sync-код, и обе половины клонируются. По сути то же, что tokio::sync::mpsc, но без привязки к конкретному рантайму.
Этот набор асинхронных каналов - стандартный выбор для async-кода на рантайме tokio. Если пишите async вне tokio (async-std/smol), то смотрите в сторону async-channel.
flume
flume - высокопроизводительные mpmc каналы. Интересен тем, что один и тот же канал умеет работать и синхронно, и асинхронно. По API это почти полная замена std::sync::mpsc, но ещё с async, таймаутами и select-интерфейсом. Весь крейт написан без единой строчки unsafe.
Способы создания каналов:
let (tx, rx) = flume::unbounded(); // безразмерный let (tx, rx) = flume::bounded(10); // ограниченный, буфер на 10 let (tx, rx) = flume::bounded(0); // rendezvous (буфера нет)
let (tx, rx) = flume::bounded(10); // Синхронный отправитель (обычный поток) std::thread::spawn(move || { tx.send(42).unwrap(); }); // Асинхронный получатель (tokio-задача) tokio::spawn(async move { let val = rx.recv_async().await.unwrap(); println!("{}", val); });
Примечания:
SenderиReceiverреализуютSend + Sync + Cloneподдержка таймаутов/дедлайнов на отправку и приём (
send_timeout/send_deadline,recv_timeout/recv_deadline)помимо
send_async/recv_async, естьrx.stream()/rx.into_stream()иtx.sink()/tx.into_sink()- для интеграции с комбинаторамиfuturesмало зависимостей, минимальная кодовая база, быстрая компиляция
без
unsafe-кода, мало зависимостей, быстро компилируется
Также крейт предоставляет select-подобный интерфейс:
let (tx0, rx0) = flume::unbounded(); let (tx1, rx1) = flume::unbounded(); std::thread::spawn(move || { tx0.send(true).unwrap(); tx1.send(42).unwrap(); }); // реагируем на тот канал, что готов первым flume::Selector::new() .recv(&rx0, |b| println!("Received {:?}", b)) .recv(&rx1, |n| println!("Received {:?}", n)) .wait();
Хорошо подходит, когда нужен один канал, работающий и в sync, и в async частях кода, без необходимости тащить разные крейты для разных половин приложения.
kanal
kanal - крейт каналов, вдохновлённый моделью CSP (Communicating Sequential Processes - взаимодействующие последовательные процессы), с главным упором на очень высокую производительность. Предоставляет mpmc и spsc каналы. Как и во flume, объединяет sync и async на одном канале. Цена за скорость - внутри много unsafe-кода, и крейт ещё не дошёл до стабильной версии 1.0.
Kanal применяет сильно оптимизированную составную технику передачи объектов. Когда размер данных меньше или равен размеру указателя, используется сериализация: данные кодируются как адрес указателя. Если же размер данных превышает размер указателя, протокол применяет стратегию, похожую на ту, что используется в языке Go - прямой доступ к памяти для копирования объектов со стека отправителя или записи напрямую в стек получателя. Этот составной метод не только устраняет лишние обращения по указателю, но и убирает аллокации в куче для каналов bounded(0).
Кроме того, реализация использует специально настроенный мьютекс для блокировки канала - это стало возможным благодаря предсказуемому времени внутренней блокировки канала. При этом можно задействовать стандартный мьютекс из std через фичу std-mutex.
Создание каналов
let (s, r) = kanal::bounded(8); // синхронный ограниченный let (s, r) = kanal::unbounded(); // синхронный безразмерный let (s, r) = kanal::bounded_async(8); // асинхронный ограниченный let (s, r) = kanal::unbounded_async(); // асинхронный безразмерный let (s, r) = kanal::bounded(0); // rendezvous (нулевой буфер)
Sync и async конструкторы дают разные типы (Sender/Receiver против AsyncSender/AsyncReceiver), но любой из них конвертируется в другой режим.
Пример синхронного канала с мостом в async
// Создаём ограниченный синхронный канал с буфером на 8 сообщений let (sender, receiver) = kanal::bounded(8); let s = sender.clone(); std::thread::spawn(move || { s.send("hello").unwrap(); }); // Получаем сообщение в другом потоке let msg = receiver.recv()?; println!("I got msg: {}", msg); // Преобразуем канал и используем его в async-контексте для связи между sync и async tokio::spawn(async move { // Берём канал как асинхронный и используем в async-контексте // (или конвертируем в async через to_async()) sender.as_async().send("hello").await?; });
Пример асинхронного канала с мостом в sync
// Создаём ограниченный async-канал с буфером на 8 сообщений let (sender, receiver) = kanal::bounded_async(8); sender.send("hello").await?; sender.send("hello").await?; // Клонируем получателя и конвертируем его в синхронного let receiver_sync = receiver.clone().to_sync(); tokio::spawn(async move { let msg = receiver.recv().await.unwrap(); println!("I got msg: {}", msg); }); // Создаём поток и используем получателя в sync-контексте std::thread::spawn(move || { let msg = receiver_sync.recv().unwrap(); println!("I got msg in sync context: {}", msg); });
Примечание:
мост между sync и async:
as_async()/as_sync()- берут другой вид канала по ссылкеto_async()/to_sync()- конвертируют, потребляя исходный каналclone_async()/clone_sync()- создают копию в другом режиме
подобно Go, kanal позволяет закрывать каналы функцией
close(): можно послать сигнал закрытия из любого экземпляра канала и закрыть его сразу и для отправителей, и для получателей. Состояние проверяется черезis_closed()(закрыт ли канал) иis_disconnected()(отвалилась ли отправляющая сторона)поддержка таймаутов (
recv_timeout()/send_timeout())AsyncReceiverумеет отдавать сообщения какStream, что подключает его к комбинаторамfutures
Стоит брать, когда канал горячее место и нужна максимальная пропускная способность с sync/async API
Сравнение возможностей разных реализаций каналов
Крейт | Sync | Async | Тип каналов | Особенности |
|---|---|---|---|---|
| да | нет | mpsc | есть в стандартной библиотеке |
| да | нет | mpmc | таймеры |
| через | да | mpsc | семейство каналов под разные задачи |
| да | да | mpmc | без |
| через | да | mpmc | для экосистемы async-std/smol |
| да | да | mpscspsc | высокая производительность |

Паттерны использования
Fan-out / Fan-in
Паттерн параллельной обработки данных. Один канал раздаёт задачи (fan-out), другой собирает результаты (fan-in):
use crossbeam_channel::{bounded, unbounded}; use std::thread; fn main() { let tasks = vec![2, 4, 6, 8, 10]; let num_workers = 3; let (task_tx, task_rx) = bounded(100); // Раздача задач let (result_tx, result_rx) = unbounded(); // Сбор результатов let mut final_results = Vec::new(); // // Fan-out: воркеры тянут задачи из общего канала for _ in 0..num_workers { let task_rx = task_rx.clone(); let result_tx = result_tx.clone(); thread::spawn(move || { for task in task_rx { let result = task * 2; result_tx.send(result).unwrap(); } }); } // Отправляем задачи for task in tasks { task_tx.send(task).unwrap(); } // Закрываем отправители drop(task_tx); drop(result_tx); // Fan-in: собираем результаты паралельно с воркерами // Цикл закончится, когда все воркеры завершатся и клоны result_tx закроются for result in result_rx { final_results.push(result); } println!("Результаты: {:?}", final_results); }
Actor model
Отдельная задача/поток единолично владеет своим состоянием и взаимодействует с другими акторами только через обмен сообщениями. Сообщения обрабатываются последовательно, поэтому состояние не разделяется между потоками и обычно не требует дополнительной синхронизации (Mutex, RwLock и т.д.)
use std::collections::HashMap; use tokio::{ sync::mpsc, task::JoinHandle }; // Сообщение enum Cmd { Insert(String, i64), Increment(String), } // Актор async fn counter_actor(mut rx: mpsc::Receiver<Cmd>) { // Локальное состояние актора let mut state = HashMap::new(); // Обрабатываем сообщения while let Some(cmd) = rx.recv().await { match cmd { Cmd::Insert(k, v) => { state.insert(k, v); } Cmd::Increment(k) => { *state.entry(k).or_default() += 1; } } } println!("Итоговое состояние: {state:?}"); } // Единственная точка общения с актором #[derive(Clone)] struct CounterHandle { tx: mpsc::Sender<Cmd>, } impl CounterHandle { fn spawn() -> (Self, JoinHandle<()>) { let (tx, rx) = mpsc::channel(32); // почтовый ящик на 32 сообщения let join = tokio::spawn(counter_actor(rx)); (Self { tx }, join) } async fn insert(&self, key: &str, value: i64) { self.tx.send(Cmd::Insert(key.into(), value)).await.unwrap(); } async fn increment(&self, key: &str) { self.tx.send(Cmd::Increment(key.into())).await.unwrap(); } } #[tokio::main] async fn main() { let (counter, join) = CounterHandle::spawn(); // Отправляем команды counter.insert("hits", 0).await; counter.increment("hits").await; counter.increment("hits").await; drop(counter); // закрываем отправитель, чтобы получатель закончил работу join.await.unwrap(); // дожидаемся пока актор обработает всё и завершится }
Graceful shutdown
Завершение работы приложения, когда система сначала прекращает принимать новые данные и корректно завершает уже выполняющиеся операции, освобождает ресурсы и только после этого останавливается
use tokio::{ select, sync::{mpsc, watch}, }; #[tokio::main] async fn main() { let (work_tx, mut work_rx) = mpsc::channel::<u32>(100); let (stop_tx, mut stop_rx) = watch::channel(false); let worker = tokio::spawn(async move { loop { select! { job = work_rx.recv() => { match job { Some(j) => println!("обработка {j}"), None => break // рабочий канал закрыт } } _ = stop_rx.changed() => { if *stop_rx.borrow() { // дочищаем очередь while let Ok(j) = work_rx.try_recv() { println!("дообработка {j}"); } println!("остановлено по сигналу"); break; } } } } }); for n in 0..100 { work_tx.send(n).await.unwrap(); } stop_tx.send(true).unwrap(); worker.await.unwrap(); }
P.s. этот пример более акадимический, в реальном проекте нужно было бы использовать CancellationToken из tokio-util
Pipeline
Цепочка последовательных этапов, соединённых каналами и работающих параллельно: каждый этап преобразует элемент и передаёт дальше.
use std::sync::mpsc; use std::thread; fn main() { let (tx1, rx1) = mpsc::sync_channel(16); let (tx2, rx2) = mpsc::sync_channel(16); // Этап 1: возведение в квадрат let squarer = thread::spawn(move || { for n in rx1 { if tx2.send(n * n).is_err() { break; } } }); // Этап 2: вывод результата let printer = thread::spawn(move || { for n in rx2 { println!("Result: {n}"); } }); for n in 1..=5 { tx1.send(n).unwrap(); } drop(tx1); squarer.join().unwrap(); printer.join().unwrap(); }
Ещё раз про возможные подводные камни
Deadlock при ограниченных-каналах - если два потока отправляют друг другу через ограниченные-каналы и оба буфера полны, то это взаимная блокировка
Утечка памяти при неограниченных-каналах - если отправитель быстрее получателя, буфер растёт без ограничений
Блокировка async-рантайма - не вызывайте без необходимости синхронный
.send()или.recv()из async-контекста, т.к. это заблокирует весь поток, используйтеspawn_blockingдля синхронных каналов или переходите на async-вариантыDrop-семантика - когда все отправители дропаются, получатель узнаёт об этом. Но если вы случайно сохраните лишний клон отправителя, то канал никогда не закроется, и получатель зависнет
Механизм cooperative budget в tokio - каждой задаче tokio выдаёт бюджет. По умолчанию 128 операций на один проход планировщика, который сбрасывается, когда планировщик переключается на задачу. Когда бюджет доходит до нуля, операция возвращает
Poll::Pending, заставляя задачу уступить. Операции с каналами этот бюджет тратят: при вызовеrecvуtokio::sync::mpscбюджет автоматически расходуется на каждое возвращённое значение - так задачи с tokio-каналами становятся кооперативными. В 99% случаев это благо: горячий циклloop { rx.recv().await }при всегда готовом канале не заберёт всё процессорное время потока. Но есть подводный камень - асинхронные каналы изflumeилиkanalэтот механизм не учитывают, поэтому в некоторых ситуациях это потенциально может стать узким горлышком. Для избежания таких ситуаций можно вызывать вручнуюtokio::task::consume_budget().awaitилиtokio::task::yield_now().await
Когда каналы не подходят
Каналы - это мощный инструмент, но не универсальный молоток. Прежде чем заводить Sender/Receiver стоит разобраться: это передача данных или просто общий доступ к ним?
Нужно разделяемое изменяемое состояние (общий счётчик, конфиг, кэш, к которому ходят много потоков) - это работа для
Arc<Mutex<…>>илиArc<RwLock<…>>, а не для каналаПростой числовой счётчик или флаг - для этого хватит
AtomicUsize/AtomicBoolбез локов и каналовРазбудить, когда поменялось общее значение - это
tokio::sync::watchилиtokio::sync::Notify, а не очередь сообщенийОдна задача ждёт результат другой - часто достаточно просто
handle.await(илиjoin), а неoneshot-канал
Заключение
"Какой канал использовать" - вопрос не только о том, какой быстрее в синтетическом бенчмарке, сколько о том, какая у вас модель исполнения (sync или async) или по нужному количеству отправителей/получателей. Для большинства задач достаточно трёх инструментов: std::sync::mpsc для простого sync, tokio::sync для async и flume, когда нужен мостик между sync и async. Начните с базовой тройки и тянитесь к остальным только тогда, когда упрётесь в их ограничения.
