Комментарии 23
Честно говоря, возникает желание заняться этой темой. Это важно и актуально. Но, кажется, что эта тема хорошо проработана, потому что про «мультиагентный подход» я слышу уже дано, а, значит, разработчиками уже выработаны какие-то оптимальные решения. Или, всё же, не совсем? Но разобраться в основах было бы очень любопытно.
Это важно и актуально.Что именно важно и актуально?
а, значит, разработчиками уже выработаны какие-то оптимальные решения. Или, всё же, не совсем?Не знаю, что творится в этой теме с точки зрения computer science. На практике же есть несколько широко известных реализаций: Erlang и Akka. Erlang — это отдельный язык и виртуальная машина (т.е. настоящая платформа). Akka — это просто фреймворк для JVM (для Java и Scala). Кроме них существуют и другие реализации для разных языков и платформ (например, Elixir для Erlang VM, Quasar для Java, Orleans для .NET, Celluloid для Ruby и т.д.) Для C++ полный разброд и шатание (немного подробнее здесь). Впрочем, для C++ это нормально, т.к. C++ применяется для совершенно разных задач.
Вы часто упоминаете сложности организации back-pressure в модели акторов, но не вдавались в подробности, в чём эти сложности заключаются. Было бы интересно почитать про это.
Наивно, кажется что с этим не должно быть проблем: получатель должен отправлять сообщение «перегружен» всем отправителям, когда его очередь входящих сообщений забита более чем на 50-80% и сообщение «свободен», когда очередь освободилась до уровня 10-20%. Соответственно все отправители, если недавно было получено сообщение «перегружен» от получателя должны остановить отправку до получения сообщения «свободен». Сообщение «перегружен» также может содержать таймаут, когда оно истекает если не обновлено. Если получатель не знает, кто ему отправит сообщение — он его узнает после получения первого же сообщения. Остаётся опасность того, что каждое сообщение приходит от нового актора, но большое число акторов и так несет опасность, как Вы уже писали, да и получатель часто знает своих получателей на этапе компиляции. Теоретически, кажется, что это всё может работать без особо участия пользователя фреймворка так как необходимая информация и так доступна системе… Было бы интересно почитать, где тут возникают сложности.
P.S. Очень интересные статьи, спасибо!
Наивно, кажется что с этим не должно быть проблем: получатель должен отправлять сообщение «перегружен» всем отправителям, когда его очередь входящих сообщений забита более чем на 50-80% и сообщение «свободен», когда очередь освободилась до уровня 10-20%. Соответственно все отправители, если недавно было получено сообщение «перегружен» от получателя должны остановить отправку до получения сообщения «свободен». Сообщение «перегружен» также может содержать таймаут, когда оно истекает если не обновлено. Если получатель не знает, кто ему отправит сообщение — он его узнает после получения первого же сообщения. Остаётся опасность того, что каждое сообщение приходит от нового актора, но большое число акторов и так несет опасность, как Вы уже писали, да и получатель часто знает своих получателей на этапе компиляции. Теоретически, кажется, что это всё может работать без особо участия пользователя фреймворка так как необходимая информация и так доступна системе… Было бы интересно почитать, где тут возникают сложности.
P.S. Очень интересные статьи, спасибо!
Один из главных факторов — это то, что доставка сообщения до получателя не является мгновенной операцией. Т.е. перегруженный агент A отсылает агенту B сообщение о том, что A перегружен. Но пройдет какое-то время, в течении которого агент B будет отсылать сообщения агенту A. За это время агент A может быть загружен «по самое не хочу». Опять же, сообщение о том, что агент А освободился и готов принимать данные вновь, придет к B не сразу. В течении этого времени агент B будет простаивать, т.е. мы будет терять время.
Кроме того, далеко не всегда есть жесткая связь 1:1 между отправителем и получателем. Т.е. может быть получатель A, которому отсылают сообщения агенты B1, B2, B3 и т.д. И все с разным темпом, и все в своем собственном порядке. Так что если перегрузка A возникает при сообщении от B2, то это не значит, что блокировать нужно именно B2.
Кроме того, агент A может отсылать сообщения самому себе (такое регулярно используется). Не будет же A блокировать самого себя :)
Кроме того, есть еще отложенные и периодические сообщения, которые летят от таймера. Таймер вообще нельзя блокировать, т.к. он обслуживает толпу агентов.
Кроме того, далеко не факт, что очередь, заполненная на 80% будет разгребаться дольше, чем очередь, заполненная на 20%. Тут зависит от того, что за сообщения стоят в очереди и сколько стоит обработка каждого из них. Есть сообщения, обработка которых стоит очень дешево, есть сообщения, на обработке которых агент может тратить минуты в буквальном смысле.
Кроме того, далеко не факт, что агенту будет принадлежать собственная очередь. Это от фреймворка зависит. У нас в SObjectizer сообщения идут не в очередь агента, а в очередь диспетчера, на котором работают агенты. Для каких-то диспетчеров у каждого агента будет своя очередь, а для каких-то — будет общая очередь, разделяемая с другими агентами. Кроме того, это все unbound-очереди, так что процент их наполнения не подсчитать. AFAIK, в CAF и в Akka у каждого актора своя очередь.
Так что, по хорошему, для того, чтобы понять, насколько загружен актор, нужно понимать сколько «тяжелых» сообщений уже стоят в его очереди. Но даже при этом, когда агенты взаимодействуют в режиме 1:N или N:1, или M:N, не понятно кого именно приостанавливать.
PS. Понятно, что в каких-то конкретных случаях можно сделать какое-то частное решение, которое будет прекрасно работать на конкретной задаче. Но если смотреть с точки зрения реализации универсального фреймворка, все не так тривиально, как хотелось бы.
Кроме того, далеко не всегда есть жесткая связь 1:1 между отправителем и получателем. Т.е. может быть получатель A, которому отсылают сообщения агенты B1, B2, B3 и т.д. И все с разным темпом, и все в своем собственном порядке. Так что если перегрузка A возникает при сообщении от B2, то это не значит, что блокировать нужно именно B2.
Кроме того, агент A может отсылать сообщения самому себе (такое регулярно используется). Не будет же A блокировать самого себя :)
Кроме того, есть еще отложенные и периодические сообщения, которые летят от таймера. Таймер вообще нельзя блокировать, т.к. он обслуживает толпу агентов.
Кроме того, далеко не факт, что очередь, заполненная на 80% будет разгребаться дольше, чем очередь, заполненная на 20%. Тут зависит от того, что за сообщения стоят в очереди и сколько стоит обработка каждого из них. Есть сообщения, обработка которых стоит очень дешево, есть сообщения, на обработке которых агент может тратить минуты в буквальном смысле.
Кроме того, далеко не факт, что агенту будет принадлежать собственная очередь. Это от фреймворка зависит. У нас в SObjectizer сообщения идут не в очередь агента, а в очередь диспетчера, на котором работают агенты. Для каких-то диспетчеров у каждого агента будет своя очередь, а для каких-то — будет общая очередь, разделяемая с другими агентами. Кроме того, это все unbound-очереди, так что процент их наполнения не подсчитать. AFAIK, в CAF и в Akka у каждого актора своя очередь.
Так что, по хорошему, для того, чтобы понять, насколько загружен актор, нужно понимать сколько «тяжелых» сообщений уже стоят в его очереди. Но даже при этом, когда агенты взаимодействуют в режиме 1:N или N:1, или M:N, не понятно кого именно приостанавливать.
PS. Понятно, что в каких-то конкретных случаях можно сделать какое-то частное решение, которое будет прекрасно работать на конкретной задаче. Но если смотреть с точки зрения реализации универсального фреймворка, все не так тривиально, как хотелось бы.
А почему нельзя реализовать протокол подобный ReactiveStreams в таком случае?
По-моему весьма удобный инструмент для работы с backpressure, у grpc тоже очень похожее внутреннее API.
По-моему весьма удобный инструмент для работы с backpressure, у grpc тоже очень похожее внутреннее API.
А почему нельзя реализовать протокол подобный ReactiveStreams в таком случае?А где можно посмотреть на описание этого протокола?
Основная документация с TCK: http://www.reactive-streams.org/
TCK он для джавы, но одним из вдохновителей лежат авторы akka как раз.
API даже войдёт в JDK9.
https://community.oracle.com/docs/DOC-1006738
TCK он для джавы, но одним из вдохновителей лежат авторы akka как раз.
API даже войдёт в JDK9.
https://community.oracle.com/docs/DOC-1006738
Ну вот как раз про backpressure информация что-то с ходу не обнаруживатся. В описании изменений для JDK9 вообще вот что сказано:
Об этом же говорят и авторы reactive manifesto (раздел Basic Semantics):
В каких-то задачах это вряд ли возможно. Например, у нас может быть датчик температуры воздуха, который опрашивается раз в секунду. Этот датчик должен отдавать информацию во внешним мир именно с таким темпом. Если на информацию от датчика подписано 10 получателей, то датчик не должен озадачиваться тем, что один из 10 получателей сейчас не готов принять текущее значение.
The Flow API does not provide any APIs to signal or deal with back pressure as such, but there could be various strategies one could implement by oneself to deal with back pressure.Т.е. стандартного API для этого нет, нужно делать что-то по месту бедствия.
Об этом же говорят и авторы reactive manifesto (раздел Basic Semantics):
How elements are transmitted, their representation during transfer, or how back-pressure is signaled is not part of this specification.Из описания API для Reactive Streams следует, что основная идея в периодическом запросе N новых сообщений Subscriber-ом после того, как он разобрался со своей текущей нагрузкой. Для каких-то задач это вполне себе нормально. И может быть без особых усилий реализовано поверх асинхронного общения акторов.
В каких-то задачах это вряд ли возможно. Например, у нас может быть датчик температуры воздуха, который опрашивается раз в секунду. Этот датчик должен отдавать информацию во внешним мир именно с таким темпом. Если на информацию от датчика подписано 10 получателей, то датчик не должен озадачиваться тем, что один из 10 получателей сейчас не готов принять текущее значение.
Да, для вашего юзкейса возможно правильнее дропать какие-то сообщения на получателях если они не успевают обрабатывать весь поток.
ReactiveStreams — это такой минимальный API/протокол для реализации async non-blocking backpressure. Например необязательно посылать следующий сигнал когда обработаны все текущие сообщения, можно сделать это когда есть какое-то место в буфере обработчике например.
Akka Streams например поверх протокола ReactiveStreams реализует много чего. И отбрасывание лишних сообщений, и группировку, и непосылание новых запросов.
ReactiveStreams — это такой минимальный API/протокол для реализации async non-blocking backpressure. Например необязательно посылать следующий сигнал когда обработаны все текущие сообщения, можно сделать это когда есть какое-то место в буфере обработчике например.
Akka Streams например поверх протокола ReactiveStreams реализует много чего. И отбрасывание лишних сообщений, и группировку, и непосылание новых запросов.
Ну так ведь речь шла вот о чем:
Вы часто упоминаете сложности организации back-pressure в модели акторов, но не вдавались в подробности, в чём эти сложности заключаются.Я попытался объяснить, с чем приходится сталкиваться в общем случае. Понятно, что бывают частные случаи, в которых реализация back-pressure более очевидно (как, например, в ограниченном контексте с Reactive Streams). Вообще, вот здесь я специально подчеркивал, что хороший механизм защиты от перегрузки должен быть заточен под конкретную задачу.
Akka Streams например поверх протокола ReactiveStreams реализует много чего. И отбрасывание лишних сообщений, и группировку, и непосылание новых запросов.А можно ссылочку попросить, дабы не перелопачивать всю документацию по Akka Streams?
Штуки, про которые я рассказывал реализованы вместе с буферизацией на асинхронных границах http://doc.akka.io/docs/akka/2.4.14/scala/stream/stream-rate.html
Правда akka-stream штука достаточно сложная для понимания с наскока, и возможно будет лучше именно что познакомиться получше с документацией и может почитать исходники. Мне в akka-stream очень нравятся примитивы для написания собственной логики обработчиков (GraphStage), которую нельзя описать встроенными комбинаторами. Для обработки потоковых данных на мой взгляд akka-stream гораздо проще и удобнее, чем обычные акторы.
Правда akka-stream штука достаточно сложная для понимания с наскока, и возможно будет лучше именно что познакомиться получше с документацией и может почитать исходники. Мне в akka-stream очень нравятся примитивы для написания собственной логики обработчиков (GraphStage), которую нельзя описать встроенными комбинаторами. Для обработки потоковых данных на мой взгляд akka-stream гораздо проще и удобнее, чем обычные акторы.
Мне кажется, что обработка потоков однотипных данных гораздо лучше ложиться на модель CSP, чем на Модель Акторов. И хотя, как говорят, одна из них спокойно выражается через другую, дополнительные усилия в реализации CSP посредством акторов дает о себе знать при обработке потоков данных.
Я бы не сказал, что akka-stream это про акторы. Оно просто использует их для реализации процесса обработки однотипных данных и не более того.
Я на следил пристально за развитием Akka и ее инфраструктуры, но мне казалось, что Akka Streams появились именно потому, что просто на акторах делать обработку потоков данных неудобно. И для упрощения этой задачи сделали прослойку поверх акторов.
Частично, пожалуй, то, что вы говорите, частично на мой взгляд — это создание API для работы с типизированными данными.
akka-stream — это ведь не только про обработку линейных потоков данных. В akka-stream модель вычислений — граф, с циклами и прочим весельем при желании. Поэтому чисто на потоках например реализован akka-http.
Сами акторы в akka-stream используются только для разруливания проблем с concurrency (по сути единственный актор там — это GraphInterpreter, который исполняет действия над пайплайном обработки).
akka-stream — это ведь не только про обработку линейных потоков данных. В akka-stream модель вычислений — граф, с циклами и прочим весельем при желании. Поэтому чисто на потоках например реализован akka-http.
Сами акторы в akka-stream используются только для разруливания проблем с concurrency (по сути единственный актор там — это GraphInterpreter, который исполняет действия над пайплайном обработки).
Штуки, про которые я рассказывал реализованы вместе с буферизацией на асинхронных границах http://doc.akka.io/docs/akka/2.4.14/scala/stream/stream-rate.htmlПохоже, что задачи, возлагаемые на Akka Streams, в мире C++ решаются с помощью Data Flow and Dependency Graphs из Threading Building Blocks. Это вообще отдельная ниша со своими особенностями. Решения для нее можно строить на базе Модели Акторов (например, акторами могут быть отдельные вычислительные узлы графа, так же акторами могут быть координаторы, которые решают, какому вычислительному узлу выделить время на рабочей нити в пуле). Можно строить на базе CSP и короутин. Это может быть даже проще и удобнее: связи между узлами представляются в виде CSP-каналов, вычислительные узлы графа — в виде короутин. Когда короутина-поставщик пытается записать очередной элемент данных в заполненный канал, то она просто приостанавливается. Когда короутина-потребитель пытается прочитать очередную порцию данных из пустого канала, она так же просто приостанавливается. Тем самым получается относительно несложное отображение большого количества короутин на небольшое количество рабочих потоков.
В любом случае, Akka Streams производит впечатление штуки, которая может быть построена на базе акторов, но не наоборот. Соответственно, механизмы backpressure, которые хорошо подходят к Akka Streams не обязательно должны хорошо работать в случае с просто акторами в задачах, не похожих на задачи из области data flow programming.
Я могу ошибаться, но TBB умеет представлять вычисления только в виде орграфа, те без циклов. akka-stream умеет в циклы в том числе.
Кроме этого, если использовать много исполнителей, делающих мелкие вычисления, то смен контекста в таком вычислительном графе будет много и ничего хорошего не выйдет. Нужно иметь какой-то баланс между проведением вычислений и передачей данных.
Корутины и continuations — это хорошо, я не спорю, оно мне самому нравится.
Задачи — они разные, решения тоже есть разные, поэтому да, нужно смотреть что лучше подходит в каждом конкретном случае. Но мне кажется, что протокол, когда обработчик говорит, сколько сообщений он может принять и не умереть (плюс возможно что делать с другими сообщениями) вполне имеет право на жизнь почти везде.
Кроме этого, если использовать много исполнителей, делающих мелкие вычисления, то смен контекста в таком вычислительном графе будет много и ничего хорошего не выйдет. Нужно иметь какой-то баланс между проведением вычислений и передачей данных.
Корутины и continuations — это хорошо, я не спорю, оно мне самому нравится.
Задачи — они разные, решения тоже есть разные, поэтому да, нужно смотреть что лучше подходит в каждом конкретном случае. Но мне кажется, что протокол, когда обработчик говорит, сколько сообщений он может принять и не умереть (плюс возможно что делать с другими сообщениями) вполне имеет право на жизнь почти везде.
Нужно иметь какой-то баланс между проведением вычислений и передачей данных.При использовании CSP-каналов и короутин здесь неоткуда ожидать больших накладных расходов, чем при использовании акторов.
Но мне кажется, что протокол, когда обработчик говорит, сколько сообщений он может принять и не умереть (плюс возможно что делать с другими сообщениями) вполне имеет право на жизнь почти везде.Имеет. Но, во-первых, как один из набора возможных. Во-вторых, с ним тоже не все так просто, примеры чего можно найти даже по приведенной вами ссылке. Так что изначальный тезис о том, что backpressure для асинхронных агентов — это не так просто, как хотелось бы, на мой взгляд, пока сохраняет свою актуальность.
А по поводу протоколов для доставки можете посмотреть на https://github.com/real-logic/Aeron где есть настраиваемая надёжность доставки с приоритетами поверх UDP.
Какая же интересная у Вас работа…
Мы в свое время (15 лет назад) в качестве транспорта использовали named pipe. Преимущество в том, что мы передаем сообщения вместе с их длиной (как в UDP), но имеем надежную передачу (как в TCP).
Но… по результатам разработки я бы сейчас ограничился просто TCP/IP, а разделение на сообщения написал бы сам. Как минимум, тогда могло бы ходить не только по локальной сети.
В целом вам есть смысл посмотреть на Skype и другие голосовые мессенджеры. Там тоже несколько типов трафика (чат с обязательной доставкой, основная часть голоса, обертона, которые если не дошли — так и не надо).
Но… по результатам разработки я бы сейчас ограничился просто TCP/IP, а разделение на сообщения написал бы сам. Как минимум, тогда могло бы ходить не только по локальной сети.
В целом вам есть смысл посмотреть на Skype и другие голосовые мессенджеры. Там тоже несколько типов трафика (чат с обязательной доставкой, основная часть голоса, обертона, которые если не дошли — так и не надо).
Преимущество в том, что мы передаем сообщения вместе с их длиной (как в UDP), но имеем надежную передачу (как в TCP)Не очень понял вашу мысль. Сообщение при передаче по TCP точно так же должно предваряться длиной. Ну или придется искать в потоке данных специальные разделители (как это сделано в HTTP), что совсем неудобно при передаче BLOB-ов.
Зарегистрируйтесь на Хабре, чтобы оставить комментарий
Подводные камни для самодельной распределенности «из коробки» в С++ном акторном фреймворке