Взгляд на Tokio: как устроен этот асинхронный обработчик событий

    И для чего он используется в фреймворке для приватных блокчейнов Exonum


    Tokio — это фреймворк для разработки сетевых масштабируемых приложений на Rust, использующий компоненты для работы с асинхронным вводом/выводом. Tokio часто служит основой для других библиотек и реализаций высокопроизводительных протоколов. Несмотря на то что он является довольно молодым фреймворком, ему уже удалось стать частью экосистемы межплатформенного программного обеспечения.

    И хотя Tokio критикуют за излишнюю сложность в освоении, он уже используется в продакшн-средах, поскольку код, написанный на Tokio, легче поддерживать. Например, его уже интегрировали в hyper, tower-grpc и сonduit. Мы тоже обратились к этому решению при разработке нашей платформы Exonum.

    Работа над Exonum началась в 2016 году, когда Tokio еще не существовал, поэтому сперва нами использовалась библиотека Mio v0.5. С появлением Tokio стало ясно, что используемая библиотека Mio устарела, более того, с её помощью было сложно организовывать событийную модель Exonum. Модель включала несколько типов событий (сетевые сообщения, таймауты, сообщения из REST API и др.), а также их сортировки по степени приоритетности.

    Каждое событие влечет за собой изменение состояния узла, а значит их необходимо обрабатывать в одном потоке, в определенном порядке и по одному принципу. На Mio схему обработки каждого события приходилось описывать вручную, что при поддержании кода (добавлении/изменении параметров) могло оборачиваться большим количеством ошибок. Tokio позволил упростить этот процесс за счет встроенных функций.

    Ниже мы расскажем о компонентах стека Tokio, которые позволяют эффективно организовывать обработку асинхронных задач.


    / изображение Kevin Dooley CC

    Архитектура Tokio


    По своей сути Tokio представляет собой «обертку» над Mio. Mio — это крэйт Rust, который предоставляет API для низкоуровневого ввода/вывода и не зависит от платформы — он работает с несколькими инструментами: epoll в Linux, kqueue в Mac OS или IOCP в Windows. Таким образом, архитектура Tokio может быть представлена следующим образом:



    Futures

    Как видно из схемы выше, главным функциональным компонентом Tokio, является futures — это crate Rust, который позволяет работать с асинхронным кодом в синхронной манере. Иными словами, библиотека дает возможность оперировать с кодом, который реализует еще не выполненные задачи, как будто они уже завершились.

    По сути, futures — это значения, которые будут подсчитаны в будущем, но пока неизвестны. В формате futures можно представлять разного рода события: запросы к базам данных, таймауты, длительные задачи для CPU, чтение информации с сокета и т. д., и синхронизировать их.

    Примером future в реальной жизни может служить уведомление о доставке заказного письма почтой: по завершении доставки отправителю направляется уведомление об успешном получении адресатом письма. Получив уведомление, отправитель определяет, какие действия ему предпринимать дальше.  

    Разработчик Дэвид Симмонс (David Simmons), сотрудничавший с компаниями Intel, Genuity и Sparco Media, в качестве примера организации асинхронного ввода/вывода с помощью futures приводит обмен сообщениями с HTTP-сервером.

    Представьте, что сервер каждый раз порождает новую нить (thread) для установленного соединения. При синхронном I/O система сперва считает байты по порядку, затем обработает информацию и запишет результат обратно. При этом в момент чтения/записи нить не сможет продолжать выполнение (она блокируется), пока операция не будет завершена. Это приводит к тому, что при большом числе соединений возникают трудности при масштабировании (так называемая проблема C10k).

    В случае асинхронной обработки, нить ставит в очередь запрос на I/O и продолжает выполнение (то есть не блокируется). Система осуществляет чтение/запись через какое-то время, а нить, прежде чем использовать результаты, спрашивает, был ли выполнен запрос. Таким образом, futures способны выполнять разные задачи, например, один может считывать запрос, второй — его обрабатывать, а третий — формировать ответ.

    В крэйте futures определен типаж Future, который является ядром всей библиотеки. Этот типаж определяется для объектов, которые выполняются не сразу, а спустя некоторое время. Его основная часть выражена в коде следующим образом:

    trait Future {
        type Item;
        type Error;
        fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
        fn wait(self) -> Result<Self::Item, Self::Error> { ... }
    
        fn map<F, U>(self, f: F) -> Map<Self, F>
            where F: FnOnce(Self::Item) -> U { ... }
    }
    

    «Сердцем» типажа Future является метод poll(). Он отвечает за пересылку индикатора завершения работы, ожидания вызова или посчитанного значения. При этом futures запускаются в контексте задачи (task). Задача ассоциируется только с одним future, однако последний может быть составным, то есть содержать внутри себя несколько других futures, объединенных командами join_all() или and_then(). Например:

    let client_to_server = copy(client_reader, server_writer)
                        .and_then(|(n, _, server_writer)| {
                            shutdown(server_writer).map(move |_| n)
                        });
    

    За координацию task/future отвечает исполнитель (executor). Если есть несколько задач, запущенных одновременно, и часть из них ожидает результатов внешних асинхронных событий (например, чтение данных из сети/сокета), исполнитель должен эффективно распределить ресурсы процессора для оптимального их выполнения. На практике это происходит за счет «перебрасывания» мощностей процессора на задачи, которые могут быть выполнены, пока другие задачи заблокированы из-за отсутствия внешних данных.

    В случае отложенной задачи, executor получает информацию о том, что ее можно выполнять, при помощи метода notify(). Примером может служить исполнитель крэйта futures, который «просыпается» при вызове wait() — исходный код примера представлен в официальном репозитории Rust на GitHub:

        pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
            ThreadNotify::with_current(|notify| {
    
                loop {
                    match self.poll_future_notify(notify, 0)? {
                        Async::NotReady => notify.park(),
                        Async::Ready(e) => return Ok(e),
                    }
                }
            })
        }
    

    Streams

    Кроме futures, Tokio работает и с другими компонентами для асинхронного I/O — потоками (streams). В то время как future возвращает лишь один финальный результат, stream работает с серией событий и способен вернуть несколько результатов.

    Снова пример из реальной жизни: периодические оповещения от датчика измерения температуры могут быть представлены в виде stream. Датчик будет регулярно отправлять значение измерения температуры пользователю через некоторые промежутки времени.

    Типаж stream может выглядеть следующим образом:

    trait Stream {
        type Item;
        type Error;
        fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
    }
    

    Механика работы со stream идентична той, что применяется к futures: используются похожие комбинаторы для преобразования и изменения деталей потока. Более того, stream легко может быть преобразован в future при помощи адаптера into_future.

    Ниже мы предметно рассмотрим применение futures и stream в нашем фреймворке Exonum.

    Tokio в Exonum


    Как уже было сказано, разработчиками Exonum было принято решение использовать библиотеку Tokio для реализации цикла событий (event loop) во фреймворке.

    Упрощенная схема организации событийной модели в Exonum может быть представлена следующим образом:


    Каждый узел сети обменивается сообщениями с другими узлами. Все входящие сообщения попадают в очередь сетевых событий, куда кроме них также попадают внутренние события (тайм-ауты и события внутреннего API). Каждый тип события формирует отдельный поток (stream). Но обработка таких событий, как было отмечено ранее, — процесс синхронный, поскольку влечет за собой изменения состояния узла. Event Agregator объединяет несколько цепочек событий в одну и отправляет их с помощью канала в event loop, где они обрабатываются в порядке установленного приоритета.

    При коммуникации между узлами Exonum выполняет следующие связанные операции на каждом из них:
     
    Подключение к узлу N (открытие сокета, настройка сокета) —> Получение сообщений узла N (получение байтов из сокета, разбиение байтов на сообщения) —> Пересылка сообщений в канал текущего узла

    let connect_handle = Retry::spawn(handle.clone(), strategy, action)
                .map_err(into_other)
                // Configure socket
                .and_then(move |sock| {
                    sock.set_nodelay(network_config.tcp_nodelay)?;
                    let duration =
                        network_config.tcp_keep_alive.map(Duration::from_millis);
                    sock.set_keepalive(duration)?;
                    Ok(sock)
                })
                // Connect socket with the outgoing channel
                .and_then(move |sock| {
                    trace!("Established connection with peer={}", peer);
    
                    let stream = sock.framed(MessagesCodec::new(max_message_len));
                    let (sink, stream) = stream.split();
    
                    let writer = conn_rx
                        .map_err(|_| other_error("Can't send data into socket"))
                        .forward(sink);
                    let reader = stream.for_each(result_ok);
    
                    reader
                        .select2(writer)
                        .map_err(|_| other_error("Socket error"))
                        .and_then(|res| match res {
                            Either::A((_, _reader)) => Ok("by reader"),
                            Either::B((_, _writer)) => Ok("by writer"),
                        })
                })
    

    Ввиду того, что futures дает возможность комбинировать различные абстракции в цепочку без потери производительности системы, программный код получается разбит на маленькие функциональные блоки, а, следовательно, его становится легче поддерживать.

    Использование сети является нетривиальной задачей. Для работы с узлом необходимо к нему подключиться, а также обеспечить логику переподключений в случае разрыва:

    .map_err(into_other)

    Помимо этого, необходимо произвести настройку сокета:

    .and_then(move |sock| {
                    sock.set_nodelay(network_config.tcp_nodelay)?;
                    let duration =
                        network_config.tcp_keep_alive.map(Duration::from_millis);
                    sock.set_keepalive(duration)?;
                    Ok(sock)
                })
    

    И парсить входящие байты как сообщения:

    let (sink, stream) = stream.split();

    Каждый из приведенных блоков кода, в свою очередь, состоит из более мелких участков. Однако благодаря тому, что futures позволяет свободно комбинировать эти блоки, нам нет необходимости задумываться о внутреннем строении каждого из них.

    В завершение хотелось бы отметить, что на данный момент Exonum в качестве API использует несколько устаревшую версию iron на базе библиотеки hyper. Однако сейчас мы рассматриваем вариант перехода на чистый hyper, который использует Tokio.



    Предлагаем вам еще несколько материалов по теме из нашего блога на Хабре:

    Bitfury Group

    69,00

    Cофтверные и хардверные решения на Blockchain

    Поделиться публикацией
    Комментарии 31
      0

      Да это же один в один asyncio из Python 3. Теперь осталось обернуть futures в полноценный псевдосинхронный async/await.

        +4

        Ты не поверишь.
        https://crates.io/crates/futures-await
        Работает, правда, только на nightly.

          +2
          В nigthly rust есть github.com/alexcrichton/futures-await
            +4
            Ровно, как и «один в один» Netty. Да и, наверное, любой NIO-фреймворк. Все мы пляшем вокруг poll.

              –2
              Лучше бы сделали аналог do в Haskell и for в Scala. Можно было бы писать на полноценных фьючах чистый и понятный код, не засоряя его async/await.
                +2

                async/await существенно удобнее for/do. Только лишь с помощью комбинаторов очень многие паттерны выражать безумно геморройно. Год назад в одном проекте на скале большие асинхронные методы сначала мы пытались писать с помощью комбинаторов и for. Получалось совершенно нечитаемое месиво. После перехода на scala-async код стал чище и понятнее на порядок.


                С async-await в Rust писать асинхронный код очень легко и приятно. А с недавними разработками в области immovable generators станет все совсем хорошо.

                  0

                  Интересно, а если оператор? Перегрузить в качестве комбинатора? Должно сейчас же на найтли сработать вместо await

                    –1
                    В Haskell это сделано оператором (>>=)
                    То есть
                    do
                       x <- future1
                       y <- genFuture2 x
                       pure y
                    

                    можно записать как
                    future1 >>= genFuture2
                    

                    Но практика показывает, что do обычно читабельнее.
                    В Scala вместо оператора (>>=) почему то сделали метод flatMap. for вызлядит симпатичнее.
                    for {
                      x <- future1
                      y <- genFuture2(x)
                    } yield {
                      y
                    }
                    

                    против
                    future1.flatMap(genFuture2)
                    
                      0
                      А теперь добавьте в код несколько циклов…
                        –1
                        Вот в циклах с async/await я бы работать не рискнул, поскольку не очень понимаю его семантику в этом случае. Использовавшие его коллеги в этих случаях сталкивались с неожиданностями.
                        Я обычно использую sequence, которая меняет местами Seq и монаду (в данном случае Future).
                          0
                          А что не так с семантикой?

                          await приостанавливает выполнение асинхронной функции (освобождая поток) пока не произойдет события. Его семантика никак не меняется в цикле.
                            0
                            То есть не гарантируется, что он не будет удерживать нить во время ожидания?
                              0
                              Гарантируется.

                              Ну, на самом деле зависит от реализации, но реализация без таких гарантий будет ошибкой.
                                0
                                Тогда я не понимаю в какой код преобразуется
                                async {
                                   for( i <- Seq(1,2,3,4)) {
                                      val x = await { f(i) }
                                      println(x)
                                   }
                                }
                                
                                  0

                                  Ну так скопмпилируйте а потом декомпилируйте обратно...


                                  Если это Scala — то скорее всего там конечный автомат с тремя состояниями будет.

                                    0
                                    В скале это не сработает (т.к. for маппится на метод foreach, а сквозь методы async/await скальный не работает), но сработает, если заменить на цикл while. И да, это преобразуется в конечный автомат, как и в большинстве реализаций async/await (например, в Rust тоже).
                                  0
                                  То есть не гарантируется, что он не будет удерживать нить во время ожидания?

                                  Для таких гарантий должны соблюдаться несколько правил:


                                  • операции должны быть неблокирующими. poll_read для fs (а иногда и для для stdin) является блокирующим вызовом.
                                  • должны быть зарегистрированы другие фьючи, чтобы дать возможность executor'у переключиться на другую задачу
                        –1
                        Я работал с кодом на Scala, активно использующим async/await. Как правило, простой способ в нем разоблаться был переписать на for — код сразу получался компактнее и структурированиее.
                        Явное использование flatMap может выглядеть громозко и страшно, но с for такой проблемы нет.
                      0

                      Да нету там ничего общего, разве что epoll and kqueue

                      +2
                      С появлением Tokio стало ясно, что используемая библиотека Mio устарела

                      mio не может устареть, так как автор tokio по совместительству является автором mio, под капотом tokio — обертка над mio, которая активно обновляется. Уж вы-то должны это знать.


                      Устарели вы, а не tokio. Мало того, что вы до сих пор сидите на tokio-core, который, вообще-то уже deprecated, у вас какое-то самописное threadpool говнище, которое жрет мои ресурсы, вы создаете кучу тредов, хотя вас об этом не просили, так у вас обработка сокетов вообще однопоточная.


                      Почитайте ветку, вам многое станет ясно.

                        0
                        Более того, stream легко может быть преобразован в future при помощи адаптера into_future.

                        Это слегка "заблуждение". Лучше продемонстрировать на примере futures v0.2. В новой версии авторы переименовали комбинатор .into_future() в .next(). Что сразу проясняет ситуацию.

                          +1

                          Единственно tokio медленней на 20% чем tokio-core, а так все хорошо

                            0

                            Медленнее в однопоточном режиме. В многопоточной обработке событий мое приложение стало на 5% быстрее. Мы будем улучшать производительность. Закрыто много багов, улучшили архитектуру, вот что главное.

                              +1

                              Моё многопоточное приложение стало на 20% медленнее. Если что tokio-core не ограничивает количество потоков. К тому же забавно смотреть как hyper откатывает tokio на старую версию tokio-core чтобы в бенчмарках выглядеть лучше.

                                0

                                Тем забавнее наблюдать за бенчмарками, ведь tokio-core сейчас является фасадом для tokio.

                                  0

                                  В этом то и прикол что hyper даунгрейдеулся на версию tokio-core="=0.1.12" в которой tokio не используется. Я сделал тоже самое, и в своих проектах сделал также. Каждому своё конечно, но как-то терять 20% не очень

                                    0

                                    Почему-то все забывают о починенных в tokio багах в погоне за скоростью.

                                      –1

                                      Я что-то не уловил про баги? Какие баги? Я довольно близко знаком с tokio, не могу припомнить никаких, по крайней мере никаких фиксов в комит логе не вижу. если для вас потеря 20% это нормальн для не меня это не приемлемо.

                                        0

                                        Вот фикс бага. Без него нет возможности возвращать ошибку из UdpCodec::encode, соответственно единственный способ обработать нестандартную ситуацию — упасть. В дальнейшем этот фикс позволил отказаться от разделения кодеков tcp и udp, используя одни и те же типажи: Encoder и Decoder.


                                        Я помню этот фикс, потому что он был сделан мной.


                                        Я довольно близко знаком с tokio

                                        Вы спорите с контрибьютером с вкладом в 3к строк кода и 11 закрытых PR. Но у вас-то наверняка больше)))

                            0

                            Сам mio то нет, но мы использовали устаревшую версию 0.5 и встала необходимость либо переписывать все под 0.6 либо сразу попробовать мигрировать на более высокоуровневый tokio.


                            По поводу остального думаю будет конструктивнее, если вы опишете свое видение проблемы в виде Github issue.

                              0
                              По поводу остального думаю будет конструктивнее, если вы опишете свое видение проблемы в виде Github issue.

                              Может вы еще хотите, чтобы я в вашу репу коммитил? Ваши HR пытались меня к вам заманить, собеседования всякие устраивали, но когда они 2 раза перенесли техническое, это был эпичный провал.


                              Так что спасибо, но нет))

                          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                          Самое читаемое