Магия асинхронных операций: взгляд изнутри. Future

    Бу́дущее — гипотетическая часть линии времени, множество событий, которые ещё не произошли, но могут произойти.

    Начало

    Первое моё знакомство с объектами «будущего» произошло около 5 лет назад, во время работы над web-проектами с использованием JavaScript. Думаю, сейчас можно определённо утверждать, что асинхронные/неблокирующие подходы и языковые конструкции подтвердили свою жизнеспособность и востребованность, и заняли достойное место в наборе инструментов разработчиков на разных языках, особенно с учётом значительного развития мультипроцессорности и мультиядерности.

    С ростом популярности Flutter и Dart, в дополнение к отличной документации на сайтах этих технологий, появляется всё больше интересных статей от сообщества разработчиков по применению асинхронных конструкций, которые помогают начать эффективно использовать этот подход, отвечая на вопрос «Как и для чего это использовать?». Эта статья преследует немного другую цель: мы попробуем ответить на вопрос «Как это работает?», поэтому желательно иметь общее представление или опыт работы с асинхронным кодом.

    Итак, предлагаю сегодня заглянуть за кулисы языка Dart, и понять, кто же реально управляет нашим будущим. Поехали…

    Асинхронный код

    Как правило, значительная часть асинхронного кода приложения Flutter связана с взаимодействием с платформенными ресурсами – сетевые операции, события от пользовательского интерфейса, файловые операции или обращения к нативному коду платформы через платформенные каналы. Учитывая разнообразие таких взаимодействий, планирую выделить описание связи Flutter и Dart с платформой в отдельную статью. Сейчас же, для исследования, препарируем классический Future с отложенной операцией, который часто используют разработчики при выполнении задач, связанных с асинхронностью - это Future.delayed().

    Future.delayed(const Duration(seconds: 3), () {
    	print('Future is here!');
    });

    На самом деле, исследование этого, с виду простого, объекта будет ключом к пониманию подкапотной реализации асинхронных операций в Dart. Также это описание справедливо и для Future(), по сути, это тот же Future.delayed() с Duration.zero.

    Вне рамок темы, но всё же, хотел бы упомянуть ещё один компонент, по-моему мнению, обделённый вниманием разработчиков - Completer. С его помощью можно полностью «контролировать» Future - создавать и разрешать его в любое время, когда нам удобно. Для тех кто знаком с JavaScript - это практически совпадает по смыслу с Promise. Можно заметить, что и сам фреймворк Flutter очень активно использует Completer в своей работе, и, если вы увидите возвращаемый Future из фреймворка, например навигатора, с большой вероятностью это будет Completer, так что обратите, пожалуйста, на него внимание, он часто помогает решать «нестандартные» задачи, связанные с асинхронным кодом.

    Магия Future или кто ждёт будущее?

    Для начала взглянем на определение с сайта flutter.dev

    A Future is used to represent a potential value, or error, that will be available at some time in the future

    Будущее используется для представления потенциального значения или ошибки, которое будет доступно в какой-то момент в будущем.

    Соответственно, после создания объекта Future, наша программа может продолжить заниматься своими делами, не прерываясь и не ожидая результата, а само создание объекта происходит синхронно и быстро. Как только, в какой-то момент в будущем, Future будет разрешён, вызовутся соответствующие методы обработки результата или ошибки, которые мы задали при создании объекта.

    Для первого эксперимента будем использовать простой код на Dart, без задействования фреймворка Flutter, для целей исследования в нем нет необходимости. Код состоит из одной главной функции main() и двух Future.delayed() с задержкой выполнения. Также, во время работы программы, будут отображаться контрольные точки с помощью print. Работа со всеми примерами выполнялась в IDE VS Code.

    future_example.dart
    void main() {
      print('1. Start main');
    
      Future.delayed(
        const Duration(seconds: 3),
        () => print('3.  => Timeout Future #1'),
      );
    
      Future.delayed(
        const Duration(seconds: 10),
        () => print('4.  => Timeout Future #2'),
      );
    
      print('2. End main');
    }

    Запустим эту программу и посмотрим результат выполнения

    Программа и результат вывода в консоли
    Программа и результат вывода в консоли

    После запуска программы, и вывода в консоль всех сообщений, видно, что во 2-й строке консоли отобразился текст, вызываемый в конце нашей функции main(), т.е. программа формально завершена. Но, фактически выход из программы (exited) происходит только после разрешения всех объектов Future. Возникают вопросы которые, конечно, не мешают нам эффективно использовать Future, но, маленький исследователь который живёт внутри, не прочь бы узнать подробности.

    • Как Dart знает, что ещё не время умирать?

    • Где находятся магические часы, которые отсчитывают время?

    • А эти часы сильно нагружают процессор и насколько точно они идут?

    • Кто и как запускает код Future, когда время вышло?

    Пришло время разобраться с этим.

    Порт будущего

    Запустим простейшую программу.

    import 'dart:isolate';
    
    void main() {
      ReceivePort();
    }

    Простая магия - Выполнение этого кода заблокирует завершение. Причём, в программе нет ни Future, ни таймеров с ожиданием, вообще нет асинхронного кода, вопрос - почему программа "зависла"? На самом деле класс ReceivePort, это первая важная зацепка в нашем расследовании. Чтобы продолжить дальше, отступим немного в сторону и рассмотрим понятие Isolate в языке Dart.

    Изолят - контекст выполнения, в котором работает код приложения. Учитывая это название, неудивительно, что изоляты - максимально изолированные сущности, имеют свою отдельную кучу памяти и сборщик мусора. Как правило, создаются в отдельном потоке операционной системы, но технически могут создаваться и работать в одном. Например, для платформ Android/iOS изоляты выполняются в отдельных потоках, но, по факту, могут использовать ресурсы только одного ядра. Управление созданием изолятов в отдельных потоках, или с указанием ядра, не предусмотрено API. Создание потоков и нагрузка, создаваемая работой изолятов, распределяется автоматически runtime Dart VM и самой операционной системой.

    Пул потоков - Виртуальная машина Dart достаточно активно использует паттерн пул потоков в своей работе. Например, сообщения между изолятами также используют workers из пула потоков Dart VM для доставки сообщений. Поэтому, как правило, при создании изолята, присутствуют свободные потоки в пуле. А если их нет, тогда тратится время на создание нового, это стоит учитывать при активной работе с изолятами в вашем приложении. В случае, если у потока нет работы, через определённое время он автоматически удаляется из памяти.

    Пулл потоков VM Dart
    Пулл потоков VM Dart

    После запуска программы всегда создаётся главный изолят, в котором работает наша основная программа, например, в контексте Flutter, это так называемый RootIsolate. Он отличается от изолята создаваемого для консольного приложения, в нём, при старте ,создаётся объект для связи с платформой (window), который использует Framework Flutter при своей работе для связи с движком Flutter. Конечно, разработчик может самостоятельно создавать изоляты и запускать в них свой код.

    Порты - Поскольку изоляты не имеют доступ к памяти друг друга, связь с внешним миром и другими изолятами происходит через так называемые порты. Разработчик просто вызывает команду отправки объекта в порт. Фактически, под капотом виртуальной машины Dart, при передаче объекта в порт, данные объекта сначала сериализуются в бинарный вид (a.k.a. object snapshot), затем на другой стороне, десериализуются, создаётся новый объект, который принимает слушатель порта. Замечу, порт - однонаправленный канал, соответственно, чтобы организовать 2-х стороннюю передачу между изолятами, необходимо создать два порта, по одному на каждый изолят.

    Очередь сообщений - При отправке в порт объектов они обрабатываются только когда изолят готов их принять, пока изолят занят работой и не может обработать сообщения из порта, они накапливаются в очереди, для этого в виртуальной машине у каждого изолята существует message_handler он управляет очередью сообщений и контролирует открытые порты.

    EventLoop - понятие "очередь событий", достаточно хорошо знакомо разработчикам на Node.js. События накапливаются и обрабатываются в промежутках времени, когда участок синхронного кода, в нашем приложении, завершил свою работу. Очередь событий используется для обработки таймеров, сетевых и файловых операций, а в случае с Flutter также операций по взаимодействию с UI. Поэтому неудивительно, что большой, с точки временных затрат, объем синхронного кода, в крайнем случае, может даже заблокировать обработку сообщений и "подвесить" приложение. В Dart этот подход представлен message handler, причём дальше мы увидим, что, в случае с Future и сетевых операций, это основной, но не единственный компонент в схеме реализации очереди событий.

    Чтобы понять, как работают порты, создадим небольшое приложение, причем, даже не будем пока использовать создание изолята, чтобы не загромождать лишними деталями.

    ReceivePort() отправка сообщения
    import 'dart:isolate';
    
    void main() {
      final receivePort = ReceivePort();
      final sendPort = receivePort.sendPort;
    
      receivePort.listen((message) {
        print("Received an *$message*");
      });
    
      sendPort.send('apple');
    }

    В этом простом примере, инициализируется пара ReceivePort/SendPort, регистрируется слушатель и отправляется сообщение. Как видим пока никакой магии - отправляем/получаем - просто однонаправленная труба.

    Смысла отправлять самому себе сообщения особо нет, поэтому немного усложним пример, и добавим создание изолята в наш код.

    ReceivePort() отправка сообщения из изолята
    import 'dart:isolate';
    
    void main() {
      final receivePort = ReceivePort();
      final sendPort = receivePort.sendPort;
    
      receivePort.listen((message) {
        print("Received an *$message*");
      });
    
      Isolate.spawn(myIsolate, sendPort);
    }
    
    void myIsolate(SendPort sendPort) {
      sendPort.send('apple from isolate');
    }
    

    Как видим, пример не слишком усложнился, просто один конец "трубы" (sendPort) мы передали в изолят, и теперь можно через него отправлять родительскому изоляту сообщения.

    Создание изолята и отправка сообщения
    Создание изолята и отправка сообщения

    Если заглянуть под капот, в runtime Dart, можно сделать важный вывод - изолят не завершится пока остаются открытые порты (описание схемы завершения при ошибках выполнения, выходит за рамки этой статьи).

    // The isolate exits when it encounters an error or when it no
    // longer has live ports.
    if (status != kOK || !HasLivePorts()) {
          pool_ = NULL;
          // Decide if we have a callback before releasing the monitor.
          end_callback = end_callback_;
          callback_data = callback_data_;
          run_end_callback = end_callback_ != NULL;
          delete_me = delete_me_;
    }

    Открытый порт означает, что по нему, в любой момент, может прийти сообщение, при этом "владельцем" открытого порта может быть кто-угодно и где угодно, соответственно, изолят не знает от кого и когда могут прийти сообщения и просто ожидает. Сам процесс ожидания использует системное API при котором операционная система переводит изолят в спящий режим.

    Статус процесса Dart во время ожидания сообщений
    Статус процесса Dart во время ожидания сообщений
    Отображение открытого порта изолята в Observatory
    Отображение открытого порта изолята в Observatory

    Также Dart обладает хорошим инструментом диагностики состояния виртуальной машины. Например, запустив программу с ключом observe, можно открыть в браузере соответствующий url и посмотреть состояние виртуальной машины, в том числе, наличие открытых портов.

    Как формируется будущее

    Вернёмся к нашему первому простейшему примеру

    void main() {
      Future.delayed(const Duration(seconds: 3), () {
        print('Future is here!');
      });
    }

    Всё что он делает - создаёт Future с задержкой и, по истечении времени, выполняет вывод текста. Заглянем в кроличью нору, разберёмся, где же находится магический таймер, выясним чем занимается главный изолят пока ждёт срабатывания таймера, и кто вызывает наш код, когда вышло время. Для этого постепенно погрузимся в исходный код Dart.

    Future и Timer - Если заглянуть в код библиотеки, которая отвечает за работу с асинхронностью, видим, что сначала создается внутренний объект Future, а также Timer. При срабатывании таймера будет вызван обработчик который выполнит нашу функцию, а также "разрешит" Future. Т.е. для управления Future()  и Future.delayed() используется обычный таймер, поэтому, если вам нужна просто отложенная операция с возможностью отмены, можно использовать объект Timer вместо Future.

    VMLibraryHooks.timerFactory - Спускаемся на шаг ниже. Код создания таймера принимает во внимание зону в которой он создаётся, это связано, в том числе, с особенностями обработки ошибок в зонах. Само создание объекта таймера происходит через фабрику VMLibraryHooks.timerFactory. Механизм "хуков" позволяет применять различную реализацию таймеров для разных платформ, на которых работает наше приложение. Например, создание и работа таймеров для web-платформы отличается от работы под ОС Android/iOS. Также, в коде создания, видим, что длительность задержки Duration, которую мы задали, преобразуется в миллисекунды и дальше вся работа с таймерами будет производиться именно с миллисекундами.

    Очередь таймеров - После создания объекта таймера фабрикой, он помещается в очередь. Существуют две очереди для таймеров с нулевой задержкой, и очередь таймеров у которых задержка больше нуля. Причём для нулевых таймеров используется односвязный список, а для остальных - двоичная куча.

    Порт и первый ответ на наш вопрос - После того, как создался объект таймера и он занял своё место в очереди, происходит процесс контроля наличия и создания порта, принцип работы которого упоминался выше, при описании изолятов. Как видно из кода, создание порта для таймеров происходит всего один раз, и, через этот единственный порт, все таймеры изолята получают события о своём завершении. Соответственно пока порт "живой", и кто-то им может воспользоваться, наше приложение не завершится, только в случае завершения работы всех таймеров и пустой очереди, порт станет не активным и наше приложение со спокойной совестью закончит свою работу. Это ответ на вопрос - Почему наше приложение не завершается и как оно понимает, что можно "умирать". В случае с Flutter наличие "служебных" портов в главном изоляте и процесс "завершения" немного отличается от консольного приложения. Этот момент планирую подсветить в следующей статье - о платформенном взаимодействии движка Flutter и Dart.

    EventHandler - После создания порта мы видим различие в поведениях таймеров с нулевой длительностью и таймеров с длительностью задержки больше нуля. И если таймеры с нулевой задержкой, как бы сами себе отправляют сообщение в порт сразу, то таймеры, с длительностью задержки больше нуля, отправляют сообщение в EventHandler, причём они отправляют и свою длительность и порт по которому они ожидают событие завершения времени. Сам EventHandler является частью runtime Dart VM.

    Связь изолята и EventHandler, которые работают в разных потоках
    Связь изолята и EventHandler, которые работают в разных потоках

    Системный будильник - Итак, наш код, который работает в изоляте, как бы просит, чтобы его разбудили через определённое время. Этот "будильник" включается в библиотеке Dart отправкой команды VMLibraryHooks.eventHandlerSendData в команде мы указываем порт, на котором мы ждём события, а также время в миллисекундах через которое нас надо "разбудить". Заметим, что EventHandler работает в отдельном потоке, и на блокировку изолята в котором работает наш код не влияет. После получения данных о тайм-ауте EventHandler помещает его в очередь обработки к остальным таймерам, сама очередь обрабатывается последовательно с использованием poll механизма. В Android используется Linux epoll API, в iOS kevent. Если кратко - этот механизм просит операционную систему уведомить поток в случае получения наступления определенных событий - работы с сетью/файловой системой или при достижении таймаута. Заметим, что EventHandler также отвечает за работу с сетью/сокетами, это к вопросу об асинхронности сетевых операций. Само текущее время при обработке операций с тайм-аутом берется на уровне системы, в Android это clock_gettime, а в iOS - mach-absolute-time. Как правило, это простой однонаправленный счётчик, запускаемый при старте устройства, который реализуется на уровне железа процессора, и синхронизируется тактовым генератором, т.е. это не часы реального времени. Например, для x86 процессоров, это может быть ассемблерная инструкция RDTSC, а для arm CNTVCT. Этот счётчик достаточно точный чтобы определять и наносекунды, но в случае с операционными системами не реального времени, в частности на уровне приложения Dart, используются миллисекунды, достаточные для реализаций большинства задач.

    Пробуждение - После того как операционная система разбудила eventhandler, он отправляет сообщение в порт нашего изолята. Изолят видит сообщение и "просыпается", смотрит текущее время и анализирует очередь таймеров, извлекает из очереди таймер и выполняет код, который мы задали для Future.

    Один поток - Заметим, что весь наш код в случае работы с Future/Timer объектами, выполняется в одном потоке изолята, это означает, что при синхронных операциях, требующих значительных вычислений, этот механизм может затормозиться и "подвесить" наше приложение. Если вам нужны действительно параллельные потоки - используйте isolate или более удобный, в некоторых случаях, compute (он внутри также использует изолят). Работа и использование изолятов "в бою" интересный предмет для обсуждения, но выходит за рамки темы этой статьи.

    Специфическое будущее - Особняком стоит упомянуть Future.microtask. Этот объект использует microtask для выполнения, и работает вне обычного механизма обработки очереди. Как правило, это обработка специфичных небольших задач, которые должны быть выполнены до выполнения остальных Future из очереди. Для примера, во всем коде фреймворка Flutter, он используется всего около десятка раз.

    Насколько сладок async

    На десерт предлагаю попробовать на вкус async/await конструкцию, чтоб понять, насколько она соответствует определению синтаксический сахар, а заодно, разобраться в магии трансформации.

    Возьмём классический пример сахара в Dart - проверка на null и присвоение,

    a ??= 5;

    Легко может быть преобразован в

    if ( a == null) {
          a = 5;
    }

    Чтобы разобраться с этим вопросом для asynс/await реализуем простую задачу - отобразить последовательно во времени два текста, при этом ожидая результата предыдущей работы, т.е. шаг за шагом.

    Говорят: "Всё познаётся в сравнении", поэтому, напишем один пример с использованием классической цепочки then ,

    void main() {
      Future.delayed(const Duration(seconds: 3))
          .then((_) => print('Hello Future 1!'))
          .then((_) => Future.delayed(const Duration(seconds: 3)))
          .then((_) => print('Hello Future 2!'));
    }

    а второй, с async/await

    void main() async {
      await Future.delayed(const Duration(seconds: 3));
      print('Hello Future 1!');
    
      await Future.delayed(const Duration(seconds: 3));
      print('Hello Future 2!');
    }

    Для того чтобы продолжить дальше, напомню, что само преобразование при компиляции приложения на Dart/Flutter, от исходного кода до машинного, проходит 4 основных стадии преобразования: Исходный код(Dart) --> AST(Abstract Syntax Tree) --> IL(Intermediate Language) --> Ассемблер(машинный код). Чуть подробнее об этом можно почитать в моей предыдущей статье. Цепочку IL/Ассемблер мы пропустим, они не важны для нашего исследования.

    Абстрактное синтаксическое дерево - На этом этапе происходит синтаксический анализ и преобразование нашего исходного кода в более простой формат, подходящий для анализа и дальнейшего преобразования в IL. Как правило, Dart хранит этот формат в бинарном виде, при своей работе, но мы, для анализа, будем рассматривать его текстовый вид, который можно получить из бинарного (как его получить рассмотрено здесь).

    AST then - Сначала посмотрим на абстрактное представление для then

    AST представление then примера
    main = sug::main;
    library from "file:///~/development/dart-sdk/sdk/sugar_then.dart" as sug {
      static method main() → void {
        asy::Future::delayed<dynamic>(#C2).{asy::Future::then}
        <void>((dynamic _) → void => core::print("Hello Future 1!")).{asy::Future::then}
        <dynamic>((void _) → asy::Future<dynamic> => asy::Future::delayed<dynamic>(#C2)).{asy::Future::then}
        <void>((dynamic _) → void => core::print("Hello Future 2!"));
      }
    }

    Как видно, это представление практически не отличается от нашего исходного кода и по содержанию и по структуре.

    AST async - Теперь сделаем тоже самое для async/await

    AST представление ASYNC примера
    main = sug::main;
    library from "file:///Users/vadimlukicev/development/dart-sdk/sdk/sugar_async.dart" as sug {
    
      static method main() → void /* originally async */ {
        final asy::_Future<dynamic> :async_future = new asy::_Future::•<dynamic>();
        core::bool* :is_sync = false;
        FutureOr<dynamic>? :return_value;
        (dynamic) → dynamic :async_op_then;
        (core::Object, core::StackTrace) → dynamic :async_op_error;
        core::int :await_jump_var = 0;
        dynamic :await_ctx_var;
        dynamic :saved_try_context_var0;
        function :async_op([dynamic :result, dynamic :exception, dynamic :stack_trace]) → dynamic yielding
          try {
            #L1:
            {
              [yield] let dynamic #t1 = asy::_awaitHelper(asy::Future::delayed<dynamic>(#C2), :async_op_then, :async_op_error, :async_op) in null;
              :result;
              core::print("Hello Future 1!");
              [yield] let dynamic #t2 = asy::_awaitHelper(asy::Future::delayed<dynamic>(#C2), :async_op_then, :async_op_error, :async_op) in null;
              :result;
              core::print("Hello Future 2!");
            }
            asy::_completeOnAsyncReturn(:async_future, :return_value, :is_sync);
            return;
          }
          on dynamic catch(dynamic exception, core::StackTrace stack_trace) {
            asy::_completeOnAsyncError(:async_future, exception, stack_trace, :is_sync);
          }
        :async_op_then = asy::_asyncThenWrapperHelper(:async_op);
        :async_op_error = asy::_asyncErrorWrapperHelper(:async_op);
        [@vm.call-site-attributes.metadata=receiverType:dynamic Function([dynamic, dynamic, dart.core::StackTrace?])] :async_op.call();
        :is_sync = true;
        return :async_future;
      }
    }

    Ого! Наш код значительно потолстел после преобразования. Похоже async/await все же сахарный ;) На самом деле, для этих преобразований используются так называемые трансформеры, как он выглядит для async можно посмотреть здесь. Там же находятся и другие трансформеры, используемые на этапе преобразования нашего кода, возможно написать и использовать свой, правда пока не представляю случаи, когда в этом может возникнуть необходимость.

    Обратим внимание, что в нашу main функцию, отмеченную как async, было добавлено некоторое количество переменных, а также асинхронный код обернут в функцию. Для целей нашего исследования, обратим внимание на переменную await_jump_var и метку [yield]

    await_jump_var специфическая переменная, как видно, в нашем коде, она нигде не используется, тоже самое касается и [yield], это не совсем тот yield, который мы привыкли видеть в коде на dart. Если копнуть чуть поглубже - эти специфические элементы и переменные используется при работе приложения в runtime Dart VM. Если кратко, - await_jump_var хранит шаг, на котором остановился последний await c [yield], при следующем входе в эту функцию виртуальная машина Dart, смотрит на эту переменную и, используя подход по типу оператора switch, переходит на то место, где было ожидание Future последний раз.

    Заключение

    Видно, что для функционирования работы asyn/await используется как преобразование нашего исходного кода, так и была добавлена специальная функциональность в виртуальную машину Dart. По-моему мнению, это не совсем попадает под описание "синтаксический сахар", но, с другой стороны, это и не важно, важно то, что эта функциональность есть, скажем спасибо разработчикам Dart/Flutter и пожелаем удачи в дальнейшем развитии языка!

    Дисклеймер

    В следующей статье планирую разобрать асинхронность, уже с точки зрения работы движка Flutter и его связи с кодом Dart, платформенными каналами и специфичными для Flutter потоками, а также отличия в создании главного изолята для приложения Flutter и консольного приложения Dart.

    Liga Stavok
    We come to work and make sports brighter!

    Comments 4

      0
      Можете написать про shelf and shelf_router? Полка крутая, и в gRPC умеет!
        0
        Shelf на слуху, сам, для прототипов, обычно использовал связку — Node.js+Express.
        В следующем pet-проекте можно будет попробовать, но, думаю, написать статью желаемого уровня и с pet-проекта не получится у меня)
          0
          быть может она вам понравится)) и захочется осветить ее для ру комьюнити)) она классная))
        0
        Автор, спасибо за статью! С нетерпением жду следующую.

        Only users with full accounts can post comments. Log in, please.