Pull to refresh
0
Bitfury Group
Cофтверные и хардверные решения на Blockchain

Взгляд на Tokio: как устроен этот асинхронный обработчик событий

Reading time7 min
Views12K

И для чего он используется в фреймворке для приватных блокчейнов Exonum


Tokio — это фреймворк для разработки сетевых масштабируемых приложений на Rust, использующий компоненты для работы с асинхронным вводом/выводом. Tokio часто служит основой для других библиотек и реализаций высокопроизводительных протоколов. Несмотря на то что он является довольно молодым фреймворком, ему уже удалось стать частью экосистемы межплатформенного программного обеспечения.

И хотя Tokio критикуют за излишнюю сложность в освоении, он уже используется в продакшн-средах, поскольку код, написанный на Tokio, легче поддерживать. Например, его уже интегрировали в hyper, tower-grpc и сonduit. Мы тоже обратились к этому решению при разработке нашей платформы Exonum.

Работа над Exonum началась в 2016 году, когда Tokio еще не существовал, поэтому сперва нами использовалась библиотека Mio v0.5. С появлением Tokio стало ясно, что используемая библиотека Mio устарела, более того, с её помощью было сложно организовывать событийную модель Exonum. Модель включала несколько типов событий (сетевые сообщения, таймауты, сообщения из REST API и др.), а также их сортировки по степени приоритетности.

Каждое событие влечет за собой изменение состояния узла, а значит их необходимо обрабатывать в одном потоке, в определенном порядке и по одному принципу. На Mio схему обработки каждого события приходилось описывать вручную, что при поддержании кода (добавлении/изменении параметров) могло оборачиваться большим количеством ошибок. Tokio позволил упростить этот процесс за счет встроенных функций.

Ниже мы расскажем о компонентах стека Tokio, которые позволяют эффективно организовывать обработку асинхронных задач.


/ изображение Kevin Dooley CC

Архитектура Tokio


По своей сути Tokio представляет собой «обертку» над Mio. Mio — это крэйт Rust, который предоставляет API для низкоуровневого ввода/вывода и не зависит от платформы — он работает с несколькими инструментами: epoll в Linux, kqueue в Mac OS или IOCP в Windows. Таким образом, архитектура Tokio может быть представлена следующим образом:



Futures

Как видно из схемы выше, главным функциональным компонентом Tokio, является futures — это crate Rust, который позволяет работать с асинхронным кодом в синхронной манере. Иными словами, библиотека дает возможность оперировать с кодом, который реализует еще не выполненные задачи, как будто они уже завершились.

По сути, futures — это значения, которые будут подсчитаны в будущем, но пока неизвестны. В формате futures можно представлять разного рода события: запросы к базам данных, таймауты, длительные задачи для CPU, чтение информации с сокета и т. д., и синхронизировать их.

Примером future в реальной жизни может служить уведомление о доставке заказного письма почтой: по завершении доставки отправителю направляется уведомление об успешном получении адресатом письма. Получив уведомление, отправитель определяет, какие действия ему предпринимать дальше.  

Разработчик Дэвид Симмонс (David Simmons), сотрудничавший с компаниями Intel, Genuity и Sparco Media, в качестве примера организации асинхронного ввода/вывода с помощью futures приводит обмен сообщениями с HTTP-сервером.

Представьте, что сервер каждый раз порождает новую нить (thread) для установленного соединения. При синхронном I/O система сперва считает байты по порядку, затем обработает информацию и запишет результат обратно. При этом в момент чтения/записи нить не сможет продолжать выполнение (она блокируется), пока операция не будет завершена. Это приводит к тому, что при большом числе соединений возникают трудности при масштабировании (так называемая проблема C10k).

В случае асинхронной обработки, нить ставит в очередь запрос на I/O и продолжает выполнение (то есть не блокируется). Система осуществляет чтение/запись через какое-то время, а нить, прежде чем использовать результаты, спрашивает, был ли выполнен запрос. Таким образом, futures способны выполнять разные задачи, например, один может считывать запрос, второй — его обрабатывать, а третий — формировать ответ.

В крэйте futures определен типаж Future, который является ядром всей библиотеки. Этот типаж определяется для объектов, которые выполняются не сразу, а спустя некоторое время. Его основная часть выражена в коде следующим образом:

trait Future {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
    fn wait(self) -> Result<Self::Item, Self::Error> { ... }

    fn map<F, U>(self, f: F) -> Map<Self, F>
        where F: FnOnce(Self::Item) -> U { ... }
}

«Сердцем» типажа Future является метод poll(). Он отвечает за пересылку индикатора завершения работы, ожидания вызова или посчитанного значения. При этом futures запускаются в контексте задачи (task). Задача ассоциируется только с одним future, однако последний может быть составным, то есть содержать внутри себя несколько других futures, объединенных командами join_all() или and_then(). Например:

let client_to_server = copy(client_reader, server_writer)
                    .and_then(|(n, _, server_writer)| {
                        shutdown(server_writer).map(move |_| n)
                    });

За координацию task/future отвечает исполнитель (executor). Если есть несколько задач, запущенных одновременно, и часть из них ожидает результатов внешних асинхронных событий (например, чтение данных из сети/сокета), исполнитель должен эффективно распределить ресурсы процессора для оптимального их выполнения. На практике это происходит за счет «перебрасывания» мощностей процессора на задачи, которые могут быть выполнены, пока другие задачи заблокированы из-за отсутствия внешних данных.

В случае отложенной задачи, executor получает информацию о том, что ее можно выполнять, при помощи метода notify(). Примером может служить исполнитель крэйта futures, который «просыпается» при вызове wait() — исходный код примера представлен в официальном репозитории Rust на GitHub:

    pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
        ThreadNotify::with_current(|notify| {

            loop {
                match self.poll_future_notify(notify, 0)? {
                    Async::NotReady => notify.park(),
                    Async::Ready(e) => return Ok(e),
                }
            }
        })
    }

Streams

Кроме futures, Tokio работает и с другими компонентами для асинхронного I/O — потоками (streams). В то время как future возвращает лишь один финальный результат, stream работает с серией событий и способен вернуть несколько результатов.

Снова пример из реальной жизни: периодические оповещения от датчика измерения температуры могут быть представлены в виде stream. Датчик будет регулярно отправлять значение измерения температуры пользователю через некоторые промежутки времени.

Типаж stream может выглядеть следующим образом:

trait Stream {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
}

Механика работы со stream идентична той, что применяется к futures: используются похожие комбинаторы для преобразования и изменения деталей потока. Более того, stream легко может быть преобразован в future при помощи адаптера into_future.

Ниже мы предметно рассмотрим применение futures и stream в нашем фреймворке Exonum.

Tokio в Exonum


Как уже было сказано, разработчиками Exonum было принято решение использовать библиотеку Tokio для реализации цикла событий (event loop) во фреймворке.
Упрощенная схема организации событийной модели в Exonum может быть представлена следующим образом:


Каждый узел сети обменивается сообщениями с другими узлами. Все входящие сообщения попадают в очередь сетевых событий, куда кроме них также попадают внутренние события (тайм-ауты и события внутреннего API). Каждый тип события формирует отдельный поток (stream). Но обработка таких событий, как было отмечено ранее, — процесс синхронный, поскольку влечет за собой изменения состояния узла. Event Agregator объединяет несколько цепочек событий в одну и отправляет их с помощью канала в event loop, где они обрабатываются в порядке установленного приоритета.

При коммуникации между узлами Exonum выполняет следующие связанные операции на каждом из них:
 
Подключение к узлу N (открытие сокета, настройка сокета) —> Получение сообщений узла N (получение байтов из сокета, разбиение байтов на сообщения) —> Пересылка сообщений в канал текущего узла

let connect_handle = Retry::spawn(handle.clone(), strategy, action)
            .map_err(into_other)
            // Configure socket
            .and_then(move |sock| {
                sock.set_nodelay(network_config.tcp_nodelay)?;
                let duration =
                    network_config.tcp_keep_alive.map(Duration::from_millis);
                sock.set_keepalive(duration)?;
                Ok(sock)
            })
            // Connect socket with the outgoing channel
            .and_then(move |sock| {
                trace!("Established connection with peer={}", peer);

                let stream = sock.framed(MessagesCodec::new(max_message_len));
                let (sink, stream) = stream.split();

                let writer = conn_rx
                    .map_err(|_| other_error("Can't send data into socket"))
                    .forward(sink);
                let reader = stream.for_each(result_ok);

                reader
                    .select2(writer)
                    .map_err(|_| other_error("Socket error"))
                    .and_then(|res| match res {
                        Either::A((_, _reader)) => Ok("by reader"),
                        Either::B((_, _writer)) => Ok("by writer"),
                    })
            })

Ввиду того, что futures дает возможность комбинировать различные абстракции в цепочку без потери производительности системы, программный код получается разбит на маленькие функциональные блоки, а, следовательно, его становится легче поддерживать.

Использование сети является нетривиальной задачей. Для работы с узлом необходимо к нему подключиться, а также обеспечить логику переподключений в случае разрыва:

.map_err(into_other)

Помимо этого, необходимо произвести настройку сокета:

.and_then(move |sock| {
                sock.set_nodelay(network_config.tcp_nodelay)?;
                let duration =
                    network_config.tcp_keep_alive.map(Duration::from_millis);
                sock.set_keepalive(duration)?;
                Ok(sock)
            })

И парсить входящие байты как сообщения:

let (sink, stream) = stream.split();

Каждый из приведенных блоков кода, в свою очередь, состоит из более мелких участков. Однако благодаря тому, что futures позволяет свободно комбинировать эти блоки, нам нет необходимости задумываться о внутреннем строении каждого из них.

В завершение хотелось бы отметить, что на данный момент Exonum в качестве API использует несколько устаревшую версию iron на базе библиотеки hyper. Однако сейчас мы рассматриваем вариант перехода на чистый hyper, который использует Tokio.



Предлагаем вам еще несколько материалов по теме из нашего блога на Хабре:

Tags:
Hubs:
Total votes 33: ↑32 and ↓1+31
Comments31

Articles

Information

Website
bitfury.com
Registered
Employees
201–500 employees
Location
Нидерланды