Асинхронщина в Rust: Стандартная библиотека и async/.await

  • Tutorial


Введение


Перед вами руководство по специфике асинхронного программирования на языке Rust — точка входа в экосистему библиотек, справочник, на который можно опираться при проектировании системы и решении нетривиальных задач. К прочтению рекомендую и опытным разработчикам, и новичкам в Rust, только решивших окунуться в эту кроличью нору.


Вас ждёт целый цикл статей разного уровня сложности и погружения, затрагивающий не только асинхронное программирование, но и полезные шаблоны проектирования в Rust, такие как перенос инвариантов бизнес-логики на уровень системы типов, индуктивные вычисления на типах и декларативное программирование посредством комбинаторов.


Сегодня мы рассмотрим API стандартной библиотеки для асинхронного программирования и сам долгожданный синтаксис async/.await.




Теория


Данное пособие несёт исключительно практический характер. За теорией обращайтесь к следующим источникам (по порядку):


  1. "Programming Paradigms for Dummies: What Every Programmer Should Know"
  2. Хабр
    1. "Асинхронность: назад в будущее"
  3. Wikipedia
    1. "Asynchronous I/O"
    2. "Event Loop"
    3. "Reactor pattern" & "Proactor pattern"
  4. "The C10K problem"



std


Стандартная библиотека недавно обзавелась обобщённым интерфейсом для асинхронного программирования, а именно — трейт Future и модуль std::task. Эти сущности связывают программы и библиотеки с разными средами асинхронного исполнения (пример такой среды — Tokio, речь о которой пойдёт в следующих статьях), тем самым достигается частичная независимость пользовательского кода от конкретных асинхронных сред.


Точно также, как замыкание представляет собой комбинируемую порцию работы синхронного, последовательного кода, трейт Future представляет собой комбинируемую порцию работы кода асинхронного (одну асинхронную операцию, футуру, фьючерс). Комбинируемость означает, что точно также, как и замыкание способно вызывать другие замыкания (состоять из них, комбинироваться), асинхронная операция способна вызывать другие асинхронные операции (помимо обычных замыканий/функций).


Рассмотрим в общих чертах принцип работы футуры:



В методе poll несложно разглядеть модель кооперативной многозадачности, при которой операционная система не осуществляет переключение контекста; вместо этого задачи сами передают контроль экзекьютору (вызывающему коду, планировщику), чтобы тот, в свою очередь, смог эффективно распределять работу задач на доступные физические исполнители (например, ядра процессора). Чаще всего, футуры исполняются на пуле потоков, но возможны и другие сценарии, например, однопоточное исполнение, а в общем случае и более экзотические варианты. Подробнее об этом в следующей статье, посвящённой Tokio.



Примечания
  • Принадлежность асинхронных операций потокам ОС и потоков ОС процессорным ядрам неустойчивая, т.е. одна футура может спокойно путешествовать по потокам ОС, а потоки ОС мигрировать по ядрам.
  • Правильнее будет назвать не "футуры", а зелёные потоки, или асинхронные операции верхнего уровня, таски, задачи.

Как было сказано выше, трейт не накладывает никаких требований на вызов метода poll, следующего после вызова, вернувшего Poll::Ready(Output): он может запаниковать, войти в бесконечный цикл и создавать множество других проблем. Неопределённое поведение, тем не менее, запрещено (нарушение целостности данных, неправильное использование небезопасных функций), вне зависимости от состояния футуры, потому что сама сигнатура poll ключевым словом unsafe не помечена. Не следует полагаться на конкретные реализации, лишённые непредвиденного поведения.


Объект типа, реализующего Future, — это, прежде всего, обычный объект (значение, переменная). Его можно хранить в динамическом массиве, передавать в функции, возвращать из функций, другими словами, делать с ним всё то, что позволено делать с другими объектами. Название такому явлению — сущность (программный компонент) первого класса, а программирование с использованием асинхронных операций в Rust — программирование высшего порядка.


Иллюстрация выше также содержит Waker. Подробно об этом в следующей секции.




WriteFuture


Для лучшего понимания реализуем свою асинхронною операцию WriteFuture, которая завершится после того, как данные будут отправлены по неблокирующему TCP сокету.


[https://gist.github.com/2f2040d1639bebf723924a73aaa262e7]


use std::{
    future::Future,
    io::{self, Read, Write},
    net::{TcpListener, TcpStream},
    pin::Pin,
    task::{Context, Poll},
    thread::{self, JoinHandle},
};

use tokio::runtime::Runtime;

struct WriteFuture<'a> {
    socket: TcpStream,
    data: &'a [u8],
}

impl<'a> WriteFuture<'a> {
    #[allow(dead_code)]
    fn new(socket: TcpStream, data: &'a [u8]) -> Self {
        socket.set_nonblocking(true).unwrap();
        Self { socket, data }
    }
}

impl Future for WriteFuture<'_> {
    type Output = io::Result<usize>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let data = self.data;

        match self.socket.write(data) {
            Ok(length) => Poll::Ready(Ok(length)),
            Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
                cx.waker().wake_by_ref();
                Poll::Pending
            }
            Err(err) => Poll::Ready(Err(err)),
        }
    }
}

fn main() {
    let server = run_server();
    let client = run_client();

    server.join().unwrap();
    client.join().unwrap();
}

const ADDR: &'static str = "127.0.0.1:18373";

fn run_server() -> JoinHandle<()> {
    thread::spawn(|| {
        let listener = TcpListener::bind(ADDR).unwrap();

        let (mut client_accepted, _addr) = listener.accept().unwrap();

        let mut message = String::new();
        client_accepted.read_to_string(&mut message).unwrap();
        dbg!(message);
    })
}

fn run_client() -> JoinHandle<()> {
    thread::spawn(|| {
        let client = TcpStream::connect(ADDR).unwrap();

        let mut rt = Runtime::new().unwrap();
        rt.block_on(WriteFuture::new(client, b"Hello, world!")).unwrap();
    })
}

Примечание

Наша WriteFuture имеет один фатальный недостаток: системный вызов отправки внутри self.socket.write(data) совершается каждый раз в при входе в WriteFuture::poll. Как это исправить — в следующей статье.


Вывод:


[src/main.rs:60] message = "Hello, world!"

Внимание на метод WriteFuture::poll. Рассмотрим подробно сопоставление с образом self.socket.write(data):


  • Ветка первая. Данные успешно записаны, возвращаем Poll::Ready(Ok(length)). Асинхронная операция теперь считается завершённой.


  • Ветка вторая. Попытка записи данных вернула ошибку io::ErrorKind::WouldBlock. Это значит, что данные не могут быть записаны мгновенно, без прерывания нашего приложения на продолжительное время. В этом случае мы даём понять экзекьютору, что наша футура требует ещё хотя бы одного вызова, чтобы стать завершённой.


  • Ветка третья. Попытка записи данных вернула ошибку, отличную от io::ErrorKind::WouldBlock. Это может быть потеря соединения, недостаточные привилегии. В этом случае возвращаем эту ошибку, после чего асинхронная операция считается завершённой.



Упрощённо у планировщика задач есть множество футур, каждая из которых в данный момент либо готова сделать прогресс, либо ещё не готова. Работа планировщика заключается в том, чтобы опросить как можно большее количество асинхронных операций, готовых сделать прогресс, за единицу времени.



По-умолчанию новая футура считается готовой совершить прогресс (разморожена), но после первого вызова poll, если, конечно, асинхронная операция уже не завершилась, она "замораживается". Waker — это ручка от экзекьютора с одной обязанностью: вовремя размораживать футуру. Для этого достаточно вызвать .wake_by_ref()/.wake() на ассоциированном с футурой Waker, который можно получить, вызвав метод cx.waker().


Другими словами, экзекьютор как минимум один раз вызывает poll на каждой асинхронной операции, а дальше уже всё зависит от возвращаемого значения poll и контекста. Такой дизайн позволяет не тратить время ЦПУ на бесполезный опрос асинхронных операций, ведь, например, настоящий асинхронный сетевой сокет (а не наша пародия) будет готов принять/отправить данные лишь после того, как очередь событий операционной системы (epoll, kqueue, ...) возвратит соответствующее событие.


Без разницы кто и откуда уведомляет экзекьютора о готовности футуры совершить прогресс; достаточным условием является обладать её контекстом. Например, разумно будет в самом цикле событий (или где-то рядом с ним) контроллировать контексты асинхронных сетевых сокетов, потому что их готовность напрямую зависит от поступления события от ОС.


Внутри fn run_client мы запускаем нашу футуру WriteFuture:


let mut rt = Runtime::new().unwrap();
rt.block_on(WriteFuture::new(client, b"Hello, world!"));

В первой строчке создаётся сам рантайм Tokio, а после этого выполнение потока ОС (напомню, что тело fn run_client находится в thread::spawn) блокируется до того момента, когда завершится WriteFuture. Метод block_on — это точка входа в рантайм, главная асинхронная операция, которая запускает все другие (об этом в следующей статье).




async/.await


Как написать асинхронную операцию, которая принимает на вход три другие асинхронные операции, отображает их результаты в объекты иного типа и вычисляет их сумму? Три асинхронные операции могут быть, например, отправкой/считыванием данных на/из сервера, функция отображения переводит отправленные/считанные байты в статистику, затем три статистики складываются в одну.


Существует три способа реализовать это:


  1. Инкапсуляция футур и ручное управление ими в [Future::poll];
  2. Адапторы;
  3. Синтаксис async/.await.

Вариант №1 (ручное управление)


Показать код
use std::{
    future::Future,
    ops::AddAssign,
    pin::Pin,
    task::{Context, Poll},
};

use pin_project::pin_project;

#[pin_project]
struct CompoundFuture<Fut1, Fut2, Fut3, F, U> {
    fut1: Option<Fut1>,
    fut2: Option<Fut2>,
    fut3: Fut3,
    f: F,
    result: Option<U>,
}

impl<Fut1, Fut2, Fut3, F, U, T> CompoundFuture<Fut1, Fut2, Fut3, F, U>
where
    Fut1: Future<Output = T>,
    Fut2: Future<Output = T>,
    Fut3: Future<Output = T>,
    F: FnMut(T) -> U,
{
    #[allow(dead_code)]
    fn new(fut1: Fut1, fut2: Fut2, fut3: Fut3, f: F) -> Self {
        Self {
            fut1: Some(fut1),
            fut2: Some(fut2),
            fut3,
            f,
            result: None,
        }
    }
}

impl<Fut1, Fut2, Fut3, F, T, U> Future for CompoundFuture<Fut1, Fut2, Fut3, F, U>
where
    Fut1: Future<Output = T>,
    Fut2: Future<Output = T>,
    Fut3: Future<Output = T>,
    F: FnMut(T) -> U,
    U: AddAssign,
{
    type Output = U;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let this = self.project();

        if let Some(fut1) = this.fut1 {
            // SAFETY: Pin::new_unchecked is safe because
            // we won't move fut1 (fut2, fut3)
            match unsafe { Pin::new_unchecked(fut1) }.poll(cx) {
                Poll::Pending => return Poll::Pending,
                Poll::Ready(x) => {
                    *this.result = Some((this.f)(x));
                    *this.fut1 = None;
                }
            }
        }

        if let Some(fut2) = this.fut2 {
            match unsafe { Pin::new_unchecked(fut2) }.poll(cx) {
                Poll::Pending => return Poll::Pending,
                Poll::Ready(x) => {
                    let result = this.result.as_mut().unwrap();
                    *result += (this.f)(x);
                    *this.fut2 = None;
                }
            }
        }

        match unsafe { Pin::new_unchecked(this.fut3) }.poll(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(x) => {
                let result = this.result.as_mut().unwrap();
                *result += (this.f)(x);
                Poll::Ready(this.result.take().unwrap())
            }
        }
    }
}

Вариант №2 (адапторы)


use std::ops::AddAssign;

use futures::Future;

fn compound_future<'a, Fut1, Fut2, Fut3, T, U, F>(
    fut1: Fut1,
    fut2: Fut2,
    fut3: Fut3,
    mut f: F,
) -> impl Future<Item = U, Error = ()> + 'a
where
    Fut1: Future<Item = T, Error = ()> + 'a,
    Fut2: Future<Item = T, Error = ()> + 'a,
    Fut3: Future<Item = T, Error = ()> + 'a,
    F: FnMut(T) -> U + 'a,
    U: AddAssign,
{
        fut1.join3(fut2, fut3).map(move |(a, b, c)| {
                let mut result = f(a);
                result += f(b);
                result += f(c);
                result
        })
}

Примечание

В коде выше используется версия futures = "0.1", поэтому сигнатура трейта Future немного другая, но пониманию ситуации это никак не мешает.


Вариант №3 (async/.await)


use std::{future::Future, ops::AddAssign};

async fn compound_future<Fut1, Fut2, Fut3, T, U, F>(
    fut1: Fut1,
    fut2: Fut2,
    fut3: Fut3,
    mut f: F,
) -> U
where
    Fut1: Future<Output = T>,
    Fut2: Future<Output = T>,
    Fut3: Future<Output = T>,
    F: FnMut(T) -> U,
    U: AddAssign,
{
    let mut result = f(fut1.await);
    result += f(fut2.await);
    result += f(fut3.await);
    result
}

Анализ


В первом варианте мы просто инкапсулировали асинхронные операции в CompoundFuture и в каждом вызове poll делаем прогресс текущей операции вручную. Не сложно догадаться, что это чревато ошибками, ведь в конечном итоге придётся писать своё подобие конечного автомата из Future (что мы и сделали) и много-много повторяющегося кода. Данный подход почти невозможно встретить в пользовательском коде, но полностью исключать его не следует, т.к. он всё ещё используется во внутренностях основополагающих библиотек (futures, Tokio и других).


Второй подход уместился всего в 24 строки, что в 3.5 раза меньше решения с ручным управлением, и, как следствие, понижается риск ошибки, повышается читабельность и поддерживаемость. Он использует понятие адапторов (комбинаторов). Дадим нестрогое определение адапторам:


Адапторы в контексте асинхронных операций — это методы, определённые на типе Fut1, отображающие (возможно, с дополнительными аргументами, влияющими на поведение) Fut1 в Fut2<..., Fut1, ...>, где Fut1: Future, Fut2: Future.

Путём последовательного вызова адапторов генерируются новые асинхронные операции (форма ленивых вычислений), каждая из которых обладает собственным поведением. Например, метод futures::future::Future::join3 возвращает футуру, вычисляющую результаты трёх переданных асинхронных операций, а futures::future::Future::map возвращает футуру, результатом которой есть значение нового типа, полученное заданным отображением результата переданной футуры.


С данным подходом всё гладко ровно до тех пор, пока компилятор не встретит несовпадение типов. Вот пример вывода компилятора в ответ на ошибку в типах в продолжительной цепочки вызова адапторов:


= note: expected type ()
        found type futures::future::and_then::AndThen<futures::stream::concat::Concat2<hyper::body::body::Body>, futures::future::or_else::OrElse<futures::future::map::Map<futures::future::and_then::AndThen<futures::future::and_then::AndThen<futures::future::map_err::MapErr<futures::future::result_::FutureResult<contract::Update, serde_json::error::Error>, [closure@src\main.rs:139:22: 144:14]>, std::result::Result<(contract::User, std::string::String, i64, i64), http::response::Response<hyper::body::body::Body>>, [closure@src\main.rs:145:23: 162:14]>, futures::future::map_err::MapErr<futures::future::and_then::AndThen<impl futures::future::Future, futures::future::either::Either<futures::future::and_then::AndThen<impl futures::future::Future, futures::future::either::Either<futures::future::then::Then<impl futures::future::Future, futures::future::either::Either<impl futures::future::Future, futures::future::result_::FutureResult<(), telegram_client::TelegramClientError>>, [closure@src\main.rs:211:51: 224:46 telegram_client:_, chat_id:_, text:_]>, futures::future::result_::FutureResult<(), telegram_client::TelegramClientError>>, [closure@src\main.rs:173:90: 230:30 file_id:_, ext:_, user:_, message_id:_, dbs:_, chat_id:_, telegram_client:_]>, futures::future::result_::FutureResult<(), telegram_client::TelegramClientError>>, [closure@src\main.rs:166:31: 235:22 user:_, chat_id:_, message_id:_, telegram_client:_, file_id:_, dbs:_]>, [closure@src\main.rs:236:30: 242:22]>, [closure@src\main.rs:163:23: 243:14 telegram_client:_, dbs:_]>, [closure@src\main.rs:245:18: 248:14]>, std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>, fn(http::response::Response<hyper::body::body::Body>) -> std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error> {std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>::Ok}>, [closure@src\main.rs:136:54: 250:6 telegram_client:_, dbs:_]>

[Взято отсюда]


Это происходит, потому что адапторы порождают глубоковложенные обобщённые типы: каждый новый адаптор — это новый уровень вложенности. Компилятору зачастую не остаётся ничего лучше, кроме как вывести их целиком на экран, вместе с типами сгенерированных замыканий, лайфтаймами и полными путями (что и демонстрирует сообщение об ошибке выше).


Проблемы предыдущих подходов решает синтаксис async/.await, силами которого можно писать асинхронный код в синхронном стиле. Асинхронный участок кода (async fn, async { ... } или async move { ... }) компилятором трансформируется в объект, реализующий Future, а fut.await внутри этих участков асинхронно ждёт выполнения fut. Это означает, что в итоговом методе poll код, расположенный после fut.await, выполняться не будет, пока fut не вернёт Poll::Ready(Output). Все приведённые выше реализации одинаковы по смыслу (но итоговый код может разный сгенерироваться). Асинхронный участок кода порождает объект с неявным типом в противовес комбинаторам, которые наслаивают их друг на друга.


Ещё одно неочевидное преимущество async/.await над адапторным/комбинаторным подходом — возможность занимать данные между "вызовами" .await — устраняется нужда в клонировании или использовании сырых (небезопасных) указателей.




Альтернативные взгляды на проблему


Как и с любым нетривиальным решением в дизайне языка, особенно совмещающего системное программирование со сравнительно мощной системой типов, существуют несколько противоположных мнений. Вот некоторые аргументы против async/.await:





async move {… } или async {… } ?


Наличие разницы между async move { ... } и async { ... } может сперва показаться неочевидной, ведь у версии простого блока ({ ... }) не существует move-аналога. Дело в том, что первый асинхронный вариант (с move) овладевает захваченными переменными, а второй вариант (без move) — сначала пытается заимствовать среду иммутабельно, если не получилось — заимствовать мутабельно, если и это не получилось — овладевает (вспомним, что асинхронные блоки преобразуются компилятором в анонимные типы, реализующие трейт Future).


Вот простой пример, демонстрирующий различие. Функция foo() два раза выводит строку с abc на экран:


async fn foo() {
    let string = String::from("abc");

    async { dbg!(&string); }.await;
    async { dbg!(&string); }.await;
}

То есть два асинхронных блока заняли string. Если же добавить ключевое слово move после и первого, и второго async, то получим ошибку компиляции, как и следовало ожидать, ведь async move { ... } пытается сразу овладеть переменной:


error[E0382]: use of moved value: `string`
 --> src/lib.rs:5:16
  |
2 |     let string = String::from("abc");
  |         ------ move occurs because `string` has type `std::string::String`, which does not implement the `Copy` trait
3 |     
4 |     async move { dbg!(&string); }.await;
  |                ------------------
  |                |       |
  |                |       variable moved due to use in generator
  |                value moved here
5 |     async move { dbg!(&string); }.await;
  |                ^^^^^^^^------^^^^
  |                |       |
  |                |       use occurs due to use in generator
  |                value used here after move

Рассмотрим ещё один пример:


async fn foo() -> String {
    let string = String::from("abc");

    async { string }.await
}

Здесь асинхронный блок не смог заимствовать string иммутабельно и мутабельно, и совершил свою последнюю (удачную) попытку — овладел ей. Если переписать код с использованием move, то результат останется тем же.


И наконец, рассмотрим пример с мутабельным заимствованием:


async fn foo() {
    let mut string = String::from("abc");

    async { string.push_str("def"); }.await;

    dbg!(string);
}

Заимствовать иммутабельно не вышло, ведь сигнатура у String::push_str требует &mut self, значит заимствуем мутабельно — компилируется успешно. Если же присоединить move, то, как вы уже догадались, произойдёт ошибка компиляции, ведь на последней строке функции располжен вызов макроса dbg!, требующего владения string:


error[E0382]: use of moved value: `string`
 --> src/lib.rs:6:10
  |
2 |     let mut string = String::from("abc");
  |         ---------- move occurs because `string` has type `std::string::String`, which does not implement the `Copy` trait
3 |     
4 |     async move { string.push_str("def"); }.await;
  |                ---------------------------
  |                | |
  |                | variable moved due to use in generator
  |                value moved here
5 |     
6 |     dbg!(string);
  |          ^^^^^^ value used here after move



Асинхронные замыкания?


Попробуем скомпилировать следующий код:


fn main() {
    let closure = async || {
        dbg!();
    };
}

Вывод:


error[E0658]: async closures are unstable
 --> src/main.rs:2:19
  |
2 |     let closure = async || {
  |                   ^^^^^
  |
  = note: see issue #62290 <https://github.com/rust-lang/rust/issues/62290> for more information

То есть асинхронные замыкания ещё не стабилизировались (RFC). Не путайте асинхронные замыкания и замыкания, возвращающие Future:


  • Первое — async [move] |...| { ... } (нестабилизированное)
  • Второе — |...| async [move] { ... } (стабилизированное)



Pin


Как вы уже могли заметить, в сигнатуре метода Future::poll присутствует self: Pin<&mut Self>. Что это значит? Это даёт гарантию компилятору, что асинхронная операция не переместится в памяти во время выполнения. Зачем это нужно? Представим ситуацию, где на вход компилятору подаётся такая конструкция (взято отсюда):


async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);
}

Должно быть сгенерировано следующее:


struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // Указывает на `x` снизу
}

struct AsyncFuture {
    x: [u8; 128],
    read_into_buf_fut: ReadIntoBuf<'self>,
}

Примечание

Лайфтайм 'self не разрешён в пользовательском коде, но в целях демонстрации он присутствует в read_into_buf_fut: ReadIntoBuf<'self>. Это означает, что ссылка живёт столько же, сколько и сама структура AsyncFuture.


Если объект AsyncFuture переместится в памяти, то указатель в read_into_buf_fut.buf инвалидируется (станет указывать на неправильное значение), что вызовет UB. Решение простое — запретить перемещение в памяти асинхронных операций во время их выполнения, что и делает self: Pin<&mut Self>.


Pin многие считают сложной темой в Rust, которую стороной обойти не получиться, если вы собираетесь вплотную заниматься асинхронным программированием. Советую прямо сейчас внимательно прочесть соответствующую страницу в документации.




Итог


Future — трейт для асинхронных (конкурентных) вычислений с poll-based кооперативной моделью исполнения. Объекты Future являются программными компонентами первого класса: мы можем с ними обращаться как нам вздумается, а сами по себе они ничего не делают — чтобы их выполнить, необходимо вызывать метод Future::poll до того момента, когда он вернёт Poll::Ready(Output). Обычно этим занимается отдельный модуль, называемый экзекьютором (или планировщиком).


async/.await в Rust — синтаксис для построения анонимных асинхронных операций. Код которых смотрится так, будто выполнен в синхронном стиле.


Ключевое слово async, после которого следуют либо фигурные скобки, либо остальная сигнатура функции, обозначает начало асинхронного блока, т.е. нечто, вычисляющее анонимную асинхронную операцию.


Ключевое слово await, которое синтаксически ведёт себя как поле, асинхронно ожидает футуру fut (fut.await). Остальное содержимое асинхронного блока будет выполнено лишь после завершения fut. Комбинаторный аналог — futures::Future::and_then (futures 0.1.x).




Далее — про то, что позволяет безжизненные футуры превратить в работающий код: про асинхронную среду исполнения Tokio.


Благодарности


За ревью спасибо @blandger, также другим людям за поддержку в русскоязычном Телеграм-чате @rust_async, куда вы тоже можете задавать вопросы, связанные с асинхронным программированием в Rust.

AdBlock has stolen the banner, but banners are not teeth — they will be back

More
Ads

Comments 25

    +1

    Спасибо за статью, интересно.


    Другими словами, экзекьютор как минимум один раз вызывает poll на каждой асинхронной операции, а дальше уже всё зависит от возвращаемого значения poll и контекста. Такой дизайн позволяет не тратить время ЦПУ на бесполезный опрос асинхронных операций, ведь, например, настоящий асинхронный сетевой сокет (а не наша пародия) будет готов принять/отправить данные лишь после того, как очередь событий операционной системы (epoll, kqueue, ...) возвратит соответствующее событие.

    Не смог разобраться, как именно такой дизайн позволяет эффективно работать с асинхронными операциями. Правильно ли я понимаю, что Poll должен вызываться до тех пор, пока Future не завершит работу, т.е. пока очередной выов не вернет Poll::Ready? Как в таком случае осуществляется работа с асинхронными IO-bound операциями (epoll итп)?

      0

      Эффективность в том, что экзекьютор не вызывает poll, когда не нужно. Например, после того, как нам epoll вернул EPOLLOUT, можно попытаться записать данные в сокет (цикл событий уведомит об этом экзекьютора), а иначе poll у сокета вызывать бессмысленно, т.к. всё равно получим что-то вроде EAGAIN.

        +1
        Вот этот момент не очень понятен. Зачем нужен Waker, если футура уже вернула Pending? По-моему, это дублирующие функции.

        Конечно, если Waker вызывается из отдельного потока — он может узнать, что произошло некое событие, которого ожидает футура и уведомить Executor-а о том, что необходимо вызвать poll.
        Но тогда получается параллельная основой футуре еще одна, но только завязанная на Waker.

        Не объясните? Официальная документация тоже ясности не внесла — там рассматривается пример с таймером, который как раз дёргает wake в отдельном потоке.
          0

          Вот в псевдокоде:


          loop {
              let events = request_events_from_os();
          
              for event in events {
                  get_future_cx(event.future_id).waker().wake_by_ref();
          
                  // К этому моменту экзекьютор должен вызвать футуру с event.future_id.
              }
          }

          Контексты футурам назначает сама среда асинхронного исполнения, они хранятся где-то в коллекции под капотом. Совсем не обязательно, чтобы контекст дёргали в футурах, его может вообще любой код трогать, который имеет к нему доступ (в псевдокоде выше с контекстами обращается цикл событий).


          Насчёт Pending. После того, как футура вернула Pending, она дропается, вместе с её контекстом. Контекст нужен во время жизненного цикла футуры, чтобы экзекьютор не делал


          loop {
              if Poll::Ready(x) = fut.poll() {
                  break;
              }
          }

          А опрашивал только когда придёт событие записи/считывания от ОС (см. первый блок кода с циклом событий), если мы про асинхронные сокеты говорим.

            0
            Да, но откуда этот код узнаёт, что нужно пнуть соответствующий waker, чтобы тот, в свою очередь, poll-нул (через exececutor, конечно) соответствующую футуру, если, теоретически это всё может исполняться в одном потоке ОС? И футура и exececutor и этот код, который дёргает waker. В псевдокоде, который Вы привели как раз предполагается, что этот цикл и executor исполняются в разных потоках.

            И почему дропается при Pending? Судя по документации, ей можно назначать обработчики (then, map и т.д.) даже после того, как она завершится!
              0

              Ага, опечатался, она при Poll::Ready дропается.


              Да, но откуда этот код узнаёт, что нужно пнуть соответствующий waker, чтобы тот, в свою очередь, poll-нул (через exececutor, конечно) соответствующую футуру, если, теоретически это всё может исполняться в одном потоке ОС?

              Внимание на .wake_by_ref(). Откуда он берётся? Стандартная библиотека экспортирует метод RawWaker::new, туда вторым параметром мы передаём RawWakerVTable, содержащую функцию для wake_by_ref (и ещё 3 других). В своей реализации wake_by_ref мы можем просто поллить футуру, т.к. поток ОС всего один.

                +1
                Т.е. метод wake/wake_by_ref, по сути, признак того, что нужно пнуть poll футуры?
                Зачем его тогда настолько от неё отделили…
                Спасибо, стало более-менее понятно. Хотя документацию ещё поизучать надо.
      +1

      Спасибо большое за статью!
      Знаете ли что-нибудь про про smol рантайм. Вроде как в async-std он теперь используется. Я читал что он простой и быстрый (быстрее токио). Пытался разобраться в сорцах, но то ли знаний не хватило, то ли воли :) Пока не разобрался. Может быть сможете пролить свет?
      Чем смол так хорош? За счёт чего автору удалось уложить такую сложную штуку как экзекьютор асинхронных вычислений так компактно (мало кода)? Какой рантайм выбирать в каких случаях (при написании нового проекта)?




      Я правильно понимаю что в ситуации


      let x = send_http_req(url1).await;
      let y = send_http_req(url2).await;
      let z = send_http_req(url3).await;

      запросы улетят "одновременно", а не поочерёдно после принятия ответа от предыдущего?

        0

        Последовательно. Чтобы было "одновременно", нужно futures::join!.

          0
          Т.е. асинхронность тут только по отношению экзекьютера: он может заниматься другой работой, пока выполняется одна из последовательности футур?
            0

            Да, всё так. Вообще одна футура — это последовательность .await, но вместе (имеются ввиду таски, зелёные потоки), когда исполняются на экзекьюторе, создают асинхронность. Можно ещё их через futures::join! (+ FuturesUnordered и т.д.) поллить, тогда кооперативность создаётся в одной футуре.

        0
        А насколько «дорогие» await`и в Расте?

        В шарпе, например, затраты есть. Хотелось бы поменьше.
        0
        Если объект AsyncFuture переместится в памяти
        А как в принципе такое возможно в Расте? Указатели контролирует ОВ, GC нет, realloc тоже нет. Кто может перемещать объекты в памяти, если на них есть владеющий указатель(ссылка) ???
        Я не представляю возможности.
          0

          Сырой указатель компилятору ничего не говорит об указываемом объекте, поэтому компилятор может вполне переместить (move) AsyncFuture, в результате чего может скопироваться массив (если move вызвал memcpy), а указатель инвалидируется.

            0
            Но ведь &Self это не сырой указатель — я про пример под заголовком WriteFuture. И не должен соответственно, перемещаться?

            Или, как я понимаю ответ Tyranron, в вызываемой (асинхронной) функции из валидной ссылки Self, используя unsafe, получается сырой указатель и контроль пропадает?

            В принципе логично — ОВ распространить еще и на потоки м.б.нерешаемой задачей
              0

              Точнее, там не &Self, а Pin<&mut Self>. Именно Pin запрещает перемещение в памяти нашей WriteFuture.


              Или, как я понимаю ответ Tyranron, в вызываемой (асинхронной) функции из валидной ссылки Self, используя unsafe, получается сырой указатель и контроль пропадает?

              Можно из Pin достать и управлять как вздумается данными, но это будет unsafe.

            +1

            Safe Rust действительно в данный момент не даёт средств для выражения self-referential структур c помощью ссылок, потому такое в нём невозможно. Но когда задача требует действительно использования self-referential структур, то приходится расчехлять unsafe и использовать сырые указатели, что и позволяет делать вещи вида:


            struct AsyncFuture {
                x: [u8; 128],
                read_into_buf_fut: ReadIntoBuf<'self>,  // ссылается на поле `x`
            }

            А работая с подобной структурой, даже уже в safe Rust, очень легко получить UB, ведь при любом перемещении AsyncFuture (мы её только что создали, мы ею владеем, и мы решили её тут же переместить) мы получим dangling pointer в read_into_buf_fut, ибо само по себе перемещение не обновит указатель, указывающий на x, а borrow checker не отслеживает сырые указатели.


            Соответственно, за'Pin'ив указатель read_into_buf_fut, мы получаем гарантию на уровне системы типов, что данные позади указателя не будут перемещены (либо что их перемещение не нарушит инвариантов типа, см. Unpin). И если у нас код где-то мувает запиненное значение, то он просто не скомпилируется.

              0
              И если у нас код где-то мувает запиненное значение, то он просто не скомпилируется.

              К сожалению, это не так. Запиненый указатель на значение не реализующее Unpin обязывает программиста соблюдать контракт Pin. Но компилятор выполнение этого контракта не проверяет. Демонстрация на playground

                +1

                Поэтому пиннинг обычно скрывается под интерфейсами, обеспечивающими эти контракты, такими как async/await или futures::pin_mut!().

                0
                Но из запинненой переменной перемещать уже нельзя, так? Поэтому скрывать нужно только создание запинненой копии ссылки, которая ее не съедает?
                0

                mem::replace(&mut *boxed_value, new_value), например, перемещает объект из Box'а на стек.

                  0
                  mem::replace прекрасен, но он unsafe…
                  Кажется, что Pin/Unpin нужны из-за того, что компилятор локально не может определить, что нечто не перемещается нигде и никогда. И поэтому надо различать нечты, которые сломаются при перемещении в памяти и нечты, которые от места размещения в памяти не зависят. Или я фантазирую?
                    0

                    mem::replace() — safe, как раз потому, что в safe коде невозможно сконструировать структуру содержащую ссылку на своё содержимое.


                    Кажется, что Pin/Unpin нужны из-за того, что компилятор локально не может определить, что нечто не перемещается нигде и никогда.

                    Правильно. Для этого нужен статический анализ (который алгоритмически неразрешим). Поэтому всё, что можно безопасно перемещать (типы не содержащие указателей на своё содержимое), пометили как Unpin. И в сейф коде можно пинить только указатели на Unpin типы.

              Only users with full accounts can post comments. Log in, please.