Асинхронный Rust в трех частях
Во введении мы посмотрели на пример асинхронного Rust без какого‑либо объяснения, как он работает. Это дало нам несколько вопросов для размышления: Что такое асинхронные функции и возвращаемые ими «future»? Что делает join_all? Чем отличается tokio::time::sleep от std::thread::sleep?
Чтобы ответить на эти вопросы нам потребуется преобразовать каждую из частей в обычный не асинхронный код Rust. Вскоре мы обнаружим, что воспроизвести foo
и join_all
достаточно просто, а вот со sleep
ситуация чуть сложнее. Начнем же.
Foo
Напомню, что асинхронно функция foo
выглядела вот так:
async fn foo(n: u64) {
println!("start {n}");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("end {n}");
}
А вот так выглядит foo
в формате обычной, не асинхронной функции. Ниже будет код всей функции, после чего мы разобьем ее на части и разберем каждую из них. Это полная замена прошлой функции, функцию main
изменять не требуется. Вы можете выполнить ее, перейдя в Playground.
fn foo(n: u64) -> Foo {
let started = false;
let duration = Duration::from_secs(1);
let sleep = Box::pin(tokio::time::sleep(duration));
Foo { n, started, sleep }
}
struct Foo {
n: u64,
started: bool,
sleep: Pin<Box<tokio::time::Sleep>>,
}
impl Future for Foo {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
if !self.started {
println!("start {}", self.n);
self.started = true;
}
if self.sleep.as_mut().poll(context).is_pending() {
return Poll::Pending;
}
println!("end {}", self.n);
Poll::Ready(())
}
}
Пойдем сверху вниз. Функция foo
— обычная функция и возвращает структуру Foo
. Она вызывает tokio::time::sleep, но не применяет .await
к future Sleep, возвращаемому sleep
. Вместо этого future сохраняется в структуре Foo
. Мы поговорим о Box::pin и Pin<Box<_>> чуть дальше.
Foo
становится future благодаря реализации трейта Future. Ниже код трейта из стандартной библиотеки:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}
Сам трейт предствляет собой пару строк кода, но содержит 3 новых типа данных: Pin, Context и Poll. Мы сфокусируемся на Poll
, так что сначала пару слов о Context
и Pin
, после чего оставим их на потом.
Каждый вызов Future::poll
получает Context
из вызывающей функции. Когда одна функция poll
вызывает другую, например когда Foo::poll
вызывает Sleep::poll
, она передает ей Context
. На данный момент это все, что нам нужно знать, пока мы не доберемся до раздела Wake ниже.
Pin
это обертка, используемая для оборачивания указателей. Пока что, если позволите, я сделаю вид что Pin
ничего не делает. Я сделаю вид, что Box::pin
это просто Box::new
, Pin<Box<T>>
это Box<T>
, Pin<&mut T>
это &mut T
, а Pin<Box<T>>::as_mut
это просто Box::as_mut. Pin
на самом деле решает крайне важную задачу в асинхронном Rust, но она будет более понятна после того, как мы попрактикуемся в написании future. Мы вернемся к этому в разделе Pin.
Итак, сфокусируемся на Poll
. Это enum
и выглядит он вот так:
pub enum Poll<T> {
Ready(T),
Pending,
}
Первая задача функции poll
- возвращать либо Poll::Ready
, либо Poll::Pending
. Возвращение Ready
значит, что future закончил работу и включает значение Output
, если оно есть. В таком случае poll
не будет вызываться повторно. Возвращение Pending
означает, что future на завершил работу и poll
будет вызвана повторно.
У вас мог возникнуть вопрос: а когда она будет вызвана повторно? Если коротко - нужно готовиться к любой ситуации. Функция poll может вызываться раз за разом в “нагруженном цикле” и нам нужно, чтобы она работала корректно. Длинный ответ на этот вопрос будет в разделе Wake.
Давайте взглянем на реализацию трейта Future
у Foo
и функцию poll
. Код:
impl Future for Foo {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
if !self.started {
println!("start {}", self.n);
self.started = true;
}
if self.sleep.as_mut().poll(context).is_pending() {
return Poll::Pending;
}
println!("end {}", self.n);
Poll::Ready(())
}
}
Мы видели, что первой задачей poll
было возвращение Ready
или Pending
и теперь можем наблюдать, что у poll
есть вторая задача, а именно сама задача future. Задача Foo
— вывод нескольких сообщений на экран и уход в сон, что и происходит в функции poll
.
Тут есть важный компромисс: poll
должна выполнять все, что можно сделать быстро, но не должна заставлять вызывающую функцию ждать ответа. Она должна сразу возвращать либо Ready
, либо Pending
. Этот компромисс и позволяет обрабатывать больше одного future за раз. Это дает им работать, не блокируя друг друга.
Чтобы следовать этому правилу, Foo::poll
приходится полагаться на то, что Sleep::poll
быстро вернет ответ. Если мы добавим отслеживание времени и вывод, то сможем отследить происходящее. В «важной ошибке», которую мы совершили во введении, thread::sleep
нарушила это правило, что привело к последовательному выполнению наших future. Если мы сделаем то же самое в Foo::poll, мы увидим тот же результат. Использование блокирующего сна в poll
заставляет вызывающую функцию ждать ответа, блокируя все остальные future.
Foo
использует флаг started
, чтобы выводить сообщение о начале лишь раз вне зависимости от того, сколько раз вызывалась функция poll
. Флаг ended
в то же время не требуется, поскольку poll
не будет вызвана повторно после возвращения Ready
. Флаг started
превращает Foo
в машину состояния с двумя возможными состояниями. В целом асинхронная функция требует какого‑то начального состояния, а также еще по состоянию на каждую из точек .await
, чтобы функция poll
могла определить, с какой точки «продолжить выполнение». Если бы у нас было больше двух состояний, мы могли бы использовать enum
вместо bool
. При написании async fn
компилятор делает это за нас и подобное удобство это основная причина наличия async/await
как фичи языка.
Join
Теперь, когда мы разобрались с тем, как реализовать простой future, взглянем на join_all. Может сложиться впечатление, что join_all
за кулисами использует какую‑то магию по сравнению с foo
, но на самом деле оказывается, что у нас есть все нужное для ее реализации. Вот join_all
в качестве обычной не асинхронной функции:
fn join_all<F: Future>(futures: Vec<F>) -> JoinAll<F> {
JoinAll {
futures: futures.into_iter().map(Box::pin).collect(),
}
}
struct JoinAll<F> {
futures: Vec<Pin<Box<F>>>,
}
impl<F: Future> Future for JoinAll<F> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
let is_pending = |future: &mut Pin<Box<F>>| {
future.as_mut().poll(context).is_pending()
};
self.futures.retain_mut(is_pending);
if self.futures.is_empty() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
Опять же, наша не асинхронная join_all
возвращает структуру, реализующую трейт Future
, где работа функции происходит в Future::poll
. Здесь опять есть Box::pin
, но давайте продолжим ее игнорировать.
Внутри poll
всю сложную работу делает Vec::retain_mut. Это обычный метод Vec
, который принимает замыкание в качестве аргумента, вызывает его для каждого элемента и убирает элементы, возвращающие false
. Это убирает future, вернувшие Ready
, следуя правилу «не вызывать их повторно».
Здесь нет ничего нового. Со стороны запуск всех future одновременно выглядел как магия, но по сути, все что происходит внутри это вызов poll
для всех элементов Vec
. Это обратная сторона компромисса, упомянутого ранее. Если мы можем доверять, что каждый вызов poll быстро вернет ответ, мы можем обрабатывать много future одним циклом.
Обратите внимание, что здесь есть шорткат — мы игнорируем возвращаемое значение вызванных future. В данном случае это работает, потому что мы используем join_all
только с foo
, не возвращающей чего‑либо. Настоящая join_all
возвращает Vec<F::Output>
, что требует больше действий для отслеживания состояния. Оставим это в качестве упражнения для читателя, как говорится
Sleep
Мы на верном пути! Кажется, у нас уже есть все, чтобы реализовать свой собственный sleep
:
fn sleep(duration: Duration) -> Sleep {
let wake_time = Instant::now() + duration;
Sleep { wake_time }
}
struct Sleep {
wake_time: Instant,
}
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<()> {
if Instant::now() >= self.wake_time {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
Хм. Код компилируется без ошибок, и логика в функции poll
выглядит верно, но при запуске он выводит сообщение «start» и затем зависает. Если добавить дополнительные операторы вывода, можно увидеть, что каждый Sleep
опрашивается один раз в начале и больше никогда. Что мы упускаем?
Оказывается, у poll
три задачи, и пока мы разобрались только с двумя. Во‑первых, poll
выполняет всю возможную работу, не блокируясь. Затем, poll
возвращает Ready
, если он завершен, или Pending
, если нет. Но, наконец, каждый раз, когда poll
собирается вернуть Pending
, ему нужно «запланировать пробуждение». Ах, вот что мы забыли.
Причина, по которой мы раньше не сталкивались с этим, заключается в том, что Foo
и JoinAll
возвращают Pending
только тогда, когда другой «future» уже вернул им Pending
, что означает, что пробуждение уже запланировано. Но Sleep
— это то, что мы называем «leaf future». У него нет других future ниже по иерархии, и ему нужно пробудить себя.
Wake
Пора более внимательно рассмотреть Context. Если мы вызовем context.waker()
, он вернет Waker. Вызывая любой из методов waker.wake() или waker.wake_by_ref() функция может попросить опросить себя повторно. Эти два метода делают одно и тоже, и мы будем использовать тот, который удобнее.
Самое простое, что мы можем попробовать, это запрашивать повторный опрос при возврате Pending
каждый раз, когда возвращаем Pending
:
fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
if Instant::now() >= self.wake_time {
Poll::Ready(())
} else {
context.waker().wake_by_ref();
Poll::Pending
}
}
Этот код выводит правильный результат и завершается в нужное время, поэтому проблема с «бесконечным сном» решена, но мы заменили ее на проблему «занятого цикла». Этот код вызывает poll
снова и снова так быстро, как может, прожигая при этом 100% CPU пока не завершится. Мы можем увидеть это косвенно, подсчитывая количество вызовов poll или можем измерить напрямую с помощью инструментов, например perf в Linux.
Нам необходимо вызывать Waker
позже, когда наступит время проснуться, но мы не можем использовать thread::sleep
в poll
. Одно из возможных решений — запустить другой поток, который будет выполнять thread::sleep
за нас и затем вызывать wake
. Если бы мы делали это в каждом вызове poll, мы столкнулись бы с проблемой слишком большого количества потоков, о которой говорилось во введении. Но мы могли бы обойти это, запустив общий поток и используя канал для передачи Wakers в него. Это действительно жизнеспособная реализация, но в ней кое‑что не так. Главный поток нашей программы уже проводит большую часть времени в состоянии сна. Почему нам нужно два спящих потока? Почему нет способа пробудить наш главный поток в определенное время?
Что ж, справедливости ради, такой способ есть — именно для этого и существует tokio::time::sleep
. Но если мы действительно хотим написать свой собственный sleep
и не хотим создавать дополнительный поток для его него, то нам нужно также написать свой собственный main
.
Main
Для вызова poll
из main
нам понадобится Context
для передачи в него. Мы можем создать его с помощью Context::from_waker, но для этого нам нужен Waker
. Существует несколько способов создать его, но пока нам нужна лишь заглушка, поэтому мы используем вспомогательную функцию под названием noop_waker («Noop», «no‑op» и «nop» — это сокращения от «no operation»). Как только мы создадим Context
, мы сможем вызывать poll
в цикле:
fn main() {
let mut futures = Vec::new();
for n in 1..=10 {
futures.push(foo(n));
}
let mut joined_future = Box::pin(future::join_all(futures));
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
while joined_future.as_mut().poll(&mut context).is_pending() {
// Busy loop!
}
}
Работает! Хотя у нас все еще остается проблема «занятого цикла», как и выше. Но прежде чем решать ее, нам нужно совершить еще одну важную ошибку:
Поскольку эта версия нашего главного цикла никогда не прекращает опрос, и поскольку наш Waker
ничего не делает, может возникнуть вопрос: нужно ли вообще вызывать Waker
в Sleep::poll
? Удивительно, но это действительно необходимо. Если удалить эту строку, сначала код будет работать нормально. Но если увеличить количество заданий с десяти до ста, наши «future» никогда не «проснутся». Мы видим, что, хотя наш Waker
ничего не делает, в программе есть другие Waker
. Когда futures::future::JoinAll имеет много дочерних «future» (в версии futures v0.3.30 точный порог равен 31), он создает свои собственные Waker
для того, чтобы избегать повторного опроса дочерних элементов, которые не запросили «пробуждения». Это более эффективно, чем каждый раз опрашивать их все, но также означает, что дочерние элементы, которые никогда не вызывают свой собственный Waker
, больше не будут опрашиваться. Это и является причиной, почему «future», находящийся в состоянии Pending
, всегда должен организовать вызов своего Waker
.
Хорошо, возвращаемся к main
. Исправим проблему «занятого цикла». Нам нужно, чтобы main
использовал thread::sleep
до следующего времени пробуждения, что означает, что нам нужен способ, чтобы Sleep::poll
передавал Waker
и время пробуждения в main
. Мы воспользуемся глобальной переменной и обернем ее в Mutex
, чтобы безопасный код мог ее модифицировать.
static WAKE_TIMES: Mutex<BTreeMap<Instant, Vec<Waker>>> =
Mutex::new(BTreeMap::new());
Это отсортированный map
от времени пробуждения до Waker
. Обратите внимание, что тип значения здесь — Vec<Waker>
, а не просто Waker
, потому что для данного времени Instant
может быть несколько Waker
. Это маловероятно на Linux и macOS, где разрешение Instant::now()
измеряется в наносекундах, но на Windows разрешение составляет 15,6 миллисекунд.
Sleep::poll
может вставить свой Waker
в этот map
, используя BTreeMap::entry:
fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<()> {
if Instant::now() >= self.wake_time {
Poll::Ready(())
} else {
let mut wake_times = WAKE_TIMES.lock().unwrap();
let wakers_vec = wake_times.entry(self.wake_time).or_default();
wakers_vec.push(context.waker().clone());
Poll::Pending
}
}
После опроса наш главный цикл может считать первый ключ из map
, чтобы получить ближайшее время пробуждения. Затем он может выполнить thread::sleep
до этого времени, устраняя проблему «занятого цикла». Далее он вызывает все Waker
, время пробуждения которых наступило, прежде чем повторно запустить цикл и опросить снова:
fn main() {
let mut futures = Vec::new();
for n in 1..=10 {
futures.push(foo(n));
}
let mut joined_future = Box::pin(future::join_all(futures));
let waker = futures::task::noop_waker();
let mut context = Context::from_waker(&waker);
while joined_future.as_mut().poll(&mut context).is_pending() {
// The joined future is Pending. Sleep until the next wake time.
let mut wake_times = WAKE_TIMES.lock().unwrap();
let next_wake = wake_times.keys().next().expect("sleep forever?");
thread::sleep(next_wake.saturating_duration_since(Instant::now()));
// We just woke up. Invoke all the Wakers whose time has come.
while let Some(entry) = wake_times.first_entry() {
if *entry.key() <= Instant::now() {
entry.remove().into_iter().for_each(Waker::wake);
} else {
break;
}
}
// Loop and poll again.
}
}
Работает! Мы решили проблему «занятого цикла» и не понадобилось создавать дополнительные потоки. Это то, что нужно, чтобы написать собственный sleep
.
Это своего рода завершение первой части. Во второй части мы расширим наш главный цикл для реализации «задач». Теперь, когда мы понимаем, как работают «future», у нас есть возможность рассмотреть несколько дополнительных тем, которые пригодятся для следующих частей, хотя они не являются обязательными для их понимания.
Бонус: Pin
Теперь, когда мы знаем, как преобразовать async fn
в структуру Future
, мы можем немного больше рассказать о Pin
и о проблеме, которую он решает. Представим, что наша async fn foo
по какой-то причине использует ссылку внутри:
async fn foo(n: u64) {
let n_ref = &n;
println!("start {n_ref}");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("end {n_ref}");
}
Этот код компилируется и выполняется нормально, и выглядит как вполне обычный код на Rust. Но как бы выглядело то же самое изменение в нашей структуре future Foo
?
struct Foo {
n: u64,
n_ref: &u64,
started: bool,
sleep: Pin<Box<tokio::time::Sleep>>,
}
Код не компилируется:
error[E0106]: missing lifetime specifier
--> src/main.rs:3:12
|
3 | n_ref: &u64,
| ^ expected named lifetime parameter
Каким должно быть время жизни n_ref
? Короткий ответ: хорошего ответа нет. Само‑ссылающиеся заимствования в целом не разрешены в структурах Rust, и для того, что пытается сделать n_ref
, нет синтаксиса. Если бы он был, то нам пришлось бы ответить на несколько сложных вопросов о том, когда мы можем изменять n
и когда можем перемещать Foo
.
Но тогда, какой тип Future
сгенерировал компилятор для async fn foo
выше? Оказывается, Rust выполняет некоторые небезопасные вещи внутри, чтобы устранить невыраженные лайфтаймы, как в случае с n_ref
. Задача обертки‑указателя Pin
состоит в том, чтобы инкапсулировать эту небезопасность, чтобы мы могли писать пользовательские futures, такие как JoinAll
, в безопасном коде. Структура Pin
работает с авто‑трейтом Unpin, который реализован для большинства конкретных типов, но не для сгенерированных компилятором futures, возвращаемых async
функциями. Операции, которые могут позволить нам перемещать зафиксированные объекты либо ограничены Unpin
(как DerefMut), либо помечены как unsafe
(например get_unchecked_mut). Оказывается, наше широкое использование Box::pin
в примерах выше означало, что все наши futures автоматически были Unpin
, так что DerefMut
работал для наших ссылок Pin<&mut Self>
, и мы могли изменять поля, такие как self.started
и self.futures
, не задумываясь об этом.
На этом мы закончим обсуждение Pin, так как доскональные детали не нужны для задач (вторая часть) или ввода‑вывода (третья часть). Но если вы хотите узнать все подробности, начните с этого поста от автора Pin, а затем прочтите официальную документацию Pin.
Бонус: Отмена
Асинхронные функции выглядят и ощущаются как обычные функции, но с особыми суперсилами. Есть еще одна суперсила, о которой мы не упомянули.
Эта суперсила — отмена. Когда мы вызываем обычную функцию в блокирующем коде, у нас нет какого‑то способа отменить вызов через некоторое время. Но мы можем отменить любой future просто…не вызывая его повторно. Для этого в tokio
есть tokio::time::timeout, а также у нас есть все нужное, чтобы сделать собственную версию:
struct Timeout<F> {
sleep: Pin<Box<tokio::time::Sleep>>,
inner: Pin<Box<F>>,
}
impl<F: Future> Future for Timeout<F> {
type Output = Option<F::Output>;
fn poll(
mut self: Pin<&mut Self>,
context: &mut Context,
) -> Poll<Self::Output> {
// Check whether the inner future is finished.
if let Poll::Ready(output) = self.inner.as_mut().poll(context) {
return Poll::Ready(Some(output));
}
// Check whether time is up.
if self.sleep.as_mut().poll(context).is_ready() {
return Poll::Ready(None);
}
// Still waiting.
Poll::Pending
}
}
fn timeout<F: Future>(duration: Duration, inner: F) -> Timeout<F> {
Timeout {
sleep: Box::pin(tokio::time::sleep(duration)),
inner: Box::pin(inner),
}
}
Эта обертка работает с любой асинхронной функцией. У обычных функций подобного функционала нет.
Бонус: Рекурсия
Единственная суперсила, которой нам не хватает — рекурсия. При попытке вызвать саму себя в асинхронной функции:
async fn factorial(n: u64) -> u64 {
if n == 0 {
1
} else {
n * factorial(n - 1).await
}
}
Мы получим ошибку компиляции:
error[E0733]: recursion in an async fn requires boxing
--> recursion.rs:1:1
|
1 | async fn factorial(n: u64) -> u64 {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
…
5 | n * factorial(n - 1).await
| ---------------------- recursive call here
|
= note: a recursive `async fn` must introduce indirection such as `Box::pin` to avoid an infinitely sized future
Когда обычные функции вызывают друг друга, они динамически выделяют память на стеке вызовов. Но когда асинхронные функции выполняют .await
друг друга, они компилируются в структуры, содержащие другие структуры, размеры которых статичны. Если асинхронная функция вызывает саму себя, ей приходится помещать рекуррентный future в Box
перед вызовом .await
:
async fn factorial(n: u64) -> u64 {
if n == 0 {
1
} else {
let recurse = Box::pin(factorial(n - 1));
n * recurse.await
}
}
Это работает, но требует выделения памяти на куче.
Итак, на этом закончим и перейдем к «задачам».