На прошлой неделе для Rust комьюнити случилось огромное событие — вышла версия компилятора 1.39, а в месте с ней и стабилизация async-await фичи. В этом посте я постараюсь резюмировать все релевантные изменения в компиляторе и экосистеме, а также предоставить инструкции по миграции на async-await парадигму. Детального разбора асинхронности в Rust я делать не буду, есть всё ещё актуальные статьи на хабре, которые помогут войти в тему:
- чат на mio: часть 1, часть 2;
- tokio + futures-rs 0.1;
- обзор tokio;
- генераторы (тут используется непринятый вариант синтаксиса с макросом
await!, он уже работать не будет, но подноготная осталась такой же).
Помимо указанных статей можно также обратиться к документациям стандартной библиотеки и нужных крейтов, а также почитать async-book (на англ.).
Все примеры, рассматриваемые в статье, работают на стабильном компиляторе 1.39 и должны работать на всех последующих версиях. Конечный код доступен на github.
Для реализации асинхронного кода использовалась библиотека futures-0.1. Она предоставляет базовые типажи futures::Future и futures::Stream для работы с отложенными вычислениями. Они оперируют с типами Result<..> и предоставляют набор комбинаторов. Помимо этого, библиотека предоставляет каналы для общения между задачами (task), различные интерфейсы для работы с экзекьютором и его системой задач и прочее.
Рассмотрим пример, который генерирует числовой ряд из старших 32 бит факториалов и отправляет их в Sink:
// futures 0.1.29 use futures::prelude::*; use futures::{stream, futures}; fn sink_fibb_series( sink: impl Sink<SinkItem = u32, SinkError = ()>, ) -> impl Future<Item = (), Error = ()> { stream::unfold((1u32, 1), |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some(future::ok((fact, (fact, n + 1)))) }) .forward(sink) .map(|_v| ()) }
Зам.: считать CPU-bound задачи на корутинах не самое лучшее применение, зато пример самодостаточен и прост.
Как можно заметить, код выглядит достаточно громоздко: необходимо указывать возвращаемое значение, несмотря на то, что никакого полезного значения в нем нет. В futures 0.3 код становится немного проще:
// futures 0.3.1 use futures::prelude::*; use futures::stream; async fn sink_fibb_series(sink: impl Sink<u32>) { stream::unfold((1u32, 1), |(mut fact, n)| { async move { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) } }) .map(Ok) .forward(sink) .map(|_v| ()) .await; }
Здесь у функции добавляется ключевое слово async, которое оборачивает возвращаемое значение функции в Future. Поскольку в нашем случае это кортеж нулевого размера, то его можно попросту опустить, как и в обычных функциях.
Для ожидания выполнения в конце цепочки вызовов используется ключевое слово await. Этот вызов приостанавливает выполнение в текущем async-контексте и передает управление планировщику до тех пор, пока ожидаемое Future значение не будет готово. Затем выполнение возобновляется с последнего await (в нашем примере завершая функцию), т.е. поток управления становится нелинейным по сравнению с аналогичным синхронным кодом.
Ещё одно значительное различие — наличие async-блока в теле замыкания внутри stream::unfold. Эта обёртка является полным аналогом объявлением новой async-функции с таким же телом и вызовом вместо async-блока.
Возможно это замыкание в скором времени можно будет написать с помощью фичи async_closure, но увы, она пока не реализована:
async |(mut fact, n)| { while fact.checked_mul(n).is_none() { fact >>= 1; } fact *= n; Some((fact, (fact, n + 1))) }
Как можно заметить, новый типаж Stream работает не только с элементами типа Result<..>, как это было ранее. Аналогичные изменения коснулись типажа Future, определения по версиям следующие:
// futures 0.1 trait Future { type Item; type Error; fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>; } enum Async<T> { Ready(T), NotReady } // futures 0.3 trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending }
Помимо того, что тип возвращаемого значения может быть произвольным, также поменялись и входные параметры для Future::poll. Появился новый параметр Context, который предоставляет явный интерфейс для пробуждения текущей задачи. Ранее то же самое можно было достигнуть через глобальные переменные конкретного экзекьютора (например через вызов tokio::prelude::task::current().notify()).
Более фундаментальное отличие интерфейса в том, что ссылку на себя требуется оборачивать в Pin. Эта обертка над указателем гарантирует "неподвижность" данных в памяти (более подробное описание Pin есть 1.33 релизе компилятора на хабре, либо на английском, в документации стандартной библиотеки std::pin).
Попробуем теперь запустить наш пример. В качестве Sink возьмем половину канала из futures и на выходной стороне будем печатать результат с некоторой задержкой между итерациями. На futures-0.1 такой код можно написать следующим образом:
use std::time::{Duration, Instant}; // futures 0.1.29 use futures::prelude::*; use futures::sync::mpsc; // tokio 0.1.22 use tokio::runtime::Runtime; use tokio::timer::Delay; fn main() { let mut rt = Runtime::new().unwrap(); let (tx, rx) = mpsc::channel(32); rt.spawn(Box::new(sink_fibb_series(tx.sink_map_err(|_e| ())))); let fut = rx.take(100).for_each(|val| { println!("{}", val); Delay::new(Instant::now() + Duration::from_millis(50)) .map(|_| ()) .map_err(|_| ()) }); rt.spawn(Box::new(fut)); rt.shutdown_on_idle().wait().unwrap(); }
Аналогичный код с новым tokio (который на момент написания ещё alpha) и futures-0.3 может выглядеть так:
use std::time::Duration; // futures 0.3.1 use futures::channel::mpsc; use futures::prelude::*; // tokio 0.2.0-alpha.5 use tokio::timer; #[tokio::main] async fn main() { let (tx, rx) = mpsc::channel(32); tokio::spawn(sink_fibb_series(tx)); rx.take(100) .for_each(|val| { println!("{}", val); timer::delay_for(Duration::from_millis(50)) }) .await; }
Как можно заметить, код с новыми футурами стал значительно короче. По опыту автора, количество строк всегда выходит ощутимо меньше (порой даже при переписывании синхронного кода). Но как мне кажется, куда более весомое отличие в читабельности и отсутствия мешанины вызовов map/map_err, которые были необходимы из-за вариативности ошибок у стандартных типов в Result<..>.
Комбинаторы над элементами типа Result<..> тем не менее остались и находятся в отдельных типажах, некоторые со слегка обновленным названием. Теперь они разбиты по двум разным типажам; те, которые реализованы для:
- всех элементов:
futures::FuturesExtиfutures::StreamExt; - элементов типа
Result<..>:futures::TryFuturesExtиfutures::TryStreamExt.
Чуть более сложным оказывается реализация типажей Future и Stream. Для примера попробуем реализовать Stream для уже рассмотренного числового ряда. Общий тип для обеих версий футур будет следующий:
struct FactStream { fact: u32, n: u32, } impl FactStream { fn new() -> Self { Self { fact: 1, n: 1 } } }
Для futures-0.1 реализация будет следующая:
impl Stream for FactStream { type Item = u32; type Error = (); fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Ok(Async::Ready(Some(self.fact))) } }
В этом примере реализация Stream::poll фактически является полной копией замыкания stream::unfold. В случае с futures-0.3 реализация оказывается эквивалентной:
impl Stream for FactStream { type Item = u32; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.checked_mul(self.n).is_none() { self.fact >>= 1; } self.fact *= self.n; self.n += 1; Poll::Ready(Some(self.fact)) } }
Однако, если тип какого-нибудь поля структуры не реализует Unpin, то std::ops::DerefMut не будет реализовать на Pin<&mut T> и тем самым не будет мутабельного доступа ко всем полям:
use std::marker::PhantomPinned; struct Fact { inner: u32, // маркер убирает реализацию Unpin у структуры _pin: PhantomPinned, } struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { while self.fact.inner.checked_mul(self.n).is_none() { self.fact.inner >>= 1; // <- ошибка компиляции // trait `DerefMut` is required to modify // through a dereference, but it is not // implemented for `std::pin::Pin<&mut FactStream>` } self.fact.inner *= self.n; // <- тут аналогично self.n += 1; // <- Poll::Ready(Some(self.fact.inner)) } }
В таком случае, в том или ином виде придется воспользоваться unsafe функциями Pin::get_unchecked_mut и Pin::map_unchecked_mut для того, чтобы получить "проекцию" !Unpin поля (в документации есть более развернутое описание). К счастью, для таких случаев существует безопасная обёртка реализованная в крейте pin_project (детали реализации можно найти в документации библиотеки).
use pin_project::pin_project; #[pin_project] struct FactStream { fact: Fact, n: u32, } impl Stream for FactStream { type Item = u32; fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); while this.fact.inner.checked_mul(*this.n).is_none() { this.fact.inner >>= 1; } this.fact.inner *= *this.n; *this.n += 1; Poll::Ready(Some(this.fact.inner)) } }
Последний момент, который хотелось бы осветить это интеропреабельность между типажами разных версий. Для этого существует модуль futures::compat, который позволяет конвертировать из старых типов в новые и наоборот. К примеру можно проитерироваться по Stream из futures-0.1 с помощью async-await:
use std::fmt::Display; // futures 0.3 use new_futures::compat::Compat01As03 as Compat; use new_futures::StreamExt as _; // futures 0.1 use old_futures::Stream as OldStream; async fn stream_iterate<E>( old_stream: impl OldStream<Item = impl Display, Error = E>, ) -> Result<(), E> { let stream = Compat::new(old_stream); let mut stream = Box::pin(stream); while let Some(item) = stream.as_mut().next().await.transpose()? { println!("{}", item); } Ok(()) }
Примечание: в статье рассматривается только экзекьютор tokio, как наиболее долгоживущий и распространенный. Тем не менее, на нём мир не заканчивается, например существует альтернативный async-std, который помимо этого предоставляет футурные обертки для типов стандартной библиотеки, а также ThreadPool и LocalPool из рассмотренной библиотеки futures-0.3.

