Существует много технологий для организации параллельных вычислений, одна из наиболее перспективных и простых (да-да) — модель акторов. Она позволяет частично избавится от насущных проблем параллелизма, вроде состояния гонки, блокирующих ожиданий окончания операций, бесконечных мьютексов и синхронизаций и многого иного. Так же подобный подход существенно облегчает распараллеливание кода.
Знакомится будем на примере фреймворка akka используя язык java (сам akka написан на scala).
Актор — это изолированный (нет прямых ссылок на экземпляр) объект, занимающийся асинхронной обработкой входящих сообщений. Сообщение — любой неизменяемый объект реализующий интерфейс Serializable. Они складируются в очередь, и когда система передаёт управление актору (об этом далее) — объекты извлекаются из очереди по принципу FIFO и обрабатываются согласно внутренней логике, которую мы должны явно задать. Акторы образуют древовидную иерархию, каждый из них имеет актора (или систему акторов) в качестве создателя.
Чтобы отправить сообщение актору нужно иметь на него ссылку (не путать ссылкой на объект). Она бывает двух видов — ActorRef и ActorSelection. Ссылка имеет ключевой метод .tell(message, sender) отправляющий сообщение по указанному пути. Sender — это тоже ссылка типа ActorRef, именно её актор получатель получит при вызове метода getSender() при обработке сообщения.
ActorRef — ссылка на конкретный, гарантированно существовавший (на момент создания ссылки)актор, аналог StrongReference. В явном виде ActorRef можно получить, вызвав метод getSender() при обработке сообщений, получив прямую ссылку на отправителя, или же при создании дочерних акторов. Для гарантированной доставки сообщений используется сообщение-ответ или агент, об этом далее.
ActorSelection — слабая ссылка, существенным отличием которой является возможность создать ActorSelection зная лишь имя и путь к актору, но обратной стороной медали является отсутствие гарантий того, что по указанному пути хоть что-то существует. Отправить сообщение можно, но его доставку никто не гарантирует. Поэтому если нужно отправить данные с гарантией, но не имея прямой ссылки на нужный актор, ему отправляется произвольное сообщение, из ответа на которое можно взять сильную ссылку ActorRef.
Собственно, этот базовый минимум, после которого можно продолжать изучение уже на практике. Подключаем библиотеки, их в стандартной комплектации акки достаточно много, дабы избежать ручного разрешения зависимостей проще подключить сразу все (или использовать maven/etc).
Асинхронный мир начинает с системы акторов, создадим её. Обычно нет необходимости иметь более одной системы акторов в приложении.
Система имеет множество настроек, которые нужно выставить до её запуска (а она активна сразу после создания). Это настраивается в внешнем config файле или передаётся вторым аргументом методу-конструктору, о котором подробнее можно почитать на сайте разработчика, нам пока это не нужно, просто стоит знать.
Теперь следует создать актор ядра, дочерними к которому будет вся будущая логика. Технически — актор это объект, наследующий UntypedActor. Сделаем это.
Собственно, класс актора готов. Метод unhandled(arg0) помечает пришедший объект как необработанный и пишет об этом в лог. Добавим экземпляр kernel'а системе акторов:
Мы создали по отношению к системе (она является корнем иерархического древа акторов) дочерний актор класса Kernel и дали ему название «kernel». В акке повсеместно используются строковые названия и пути. Если в конструктор Kernel'а необходимо передать какие-то объекты — это делается при помощи Props.create(Kernel.class, obj1, obj2, obj3...).
Сделали эталон программирования — программу, которая не делает ничего полезного, добавим функционал, например — обработку команд, читаемых из консоли. В контексте актора нельзя использовать блокирующие поток вызовы/операции, совсем (есть способ, как их таки можно туда засунуть, о загадочных монадах Future немного позже), а ожидание ввода команды в консоль — одна из таких. Поэтому целесообразно всячески выносить из мира акторов и избегать блокирующих вызовов, применим первое — будем вне системы акторов читать команды с консоли и отправлять их в виде сообщений ядру.
Единственное, что стоит отметить — мы посылаем из «внешнего мира» сообщение в мир акторов, ссылки на отправляющего, ясное дело, нет, поэтому применяется заглушка.
Теперь классу kernel нужно дописать обработку пришедшей строки, сделаем простое эхо.
Примерно так выглядит логика любого актора, список блоков с условием instanceof и return при окончании + unhandled(arg0) после всех блоков, авось что-то не обработалось. Мы в качестве sender'а использовали ActorRef.noSender(), теперь самое время посмотреть, как выглядит адрес:
Получаем
И видим, что адрес существует, а не какой-нибудь там null, указывает на заглушку корзины, которая принимает объекты и ничего с ними не делает. Такой тип адреса называется локальным и абсолютным, он верный только в рамках данной системы акторов. Подробнее о адресах ниже.
Но один в поле не воин, создадим ещё акторов, и придумаем простую распределённую задачу — проверка числа на простоту. Чтобы жизнь мёдом не казалась — будем их перебирать, а чтобы акторы не ленились — без оптимизаций. Вводится с консоли число, отправляется ядру, оно должно оценить количество делителей, разослать работу рабочим, где каждый перебирает свой диапазон делителей, и когда хотя бы один заявит, что число составное — написать результат, если все закончили свою работу и не нашли ничего — число простое.
Написать проще чем рассказать, сделаем работника с прозрачной логикой.
Ничего сложного быть не должно. Теперь займёмся кодом ядра, оно выполняет две функции — распределяет работу и объединяет результаты
Не останавливаемся на отсутствии проверок, не это цель. Сразу в глаза бросаются некоторая расточительность — мы создаем акторы для одной операции и потом их уничтожаем. Конечно, можно поместить ссылки на них в массив, и по очереди вызывать оттуда, но, к счастью, всё уже сделано. Попробуем улучшить код применяя готовые решения.
Роутер — специализированный объект, передающий входящие сообщения акторам, используя определённую стратегию их выбора. Бывает двух типов — пул и группа. Группа — выбирает акторы согласно стратегии по указанному пути, пул — создает их сам. Роутер редко используется как самостоятельный объект, зачастую он инкапсулируется в актор, именно с такими роутерами-акторами мы будем иметь дело. Стратегий выбора существует много, о них можно почитать на сайте авторов фреймворка, наиболее универсальные это SmallestMailboxRouter(SM) и BroadcastRouter. Первый выбирает наименее загруженный актор из набора (по размеру мэилбокса), второй — рассылает сообщение всем. Объявим роутер с пулом акторов и стратегией SM.
В данном случае мы статически задаём размер пула, есть возможность сделать динамически изменяющиеся пулы, и даже написать свою логику изменения, но это выходит за рамки статьи.
Модифицируем код отправки заданий ядра (и убираем отправку сообщения для убиения актора):
Всё, готово. Теперь роутер сам позаботится о распределении задач. Базовые вещи продемонстрированы, можно переходить к более тонким моментам.
Выше говорилось, что акторам передаётся управление, но не было уточнения — как. За это отвечает контекст выполнения, он явно указывается в файле конфигурации, по умолчанию стоит fork-join executor, универсальный и наиболее производительный на общих задачах. Создаётся некоторое количество потоков (неявно указано в конфиге) и они передают управление акторам, выбирая их в n потоков из списка. Критерием переключения к другому актору является или пустая очередь сообщений или обработка k сообщений подряд, если не указано другое. Очевидно, что происходит при попытке актора вызвать блокирующую операцию — поток блокируется, и остальные акторы начинают обрабатываться куда медленнее. А если количество заблокированных акторов становится = количеству потоков то система замирает.
Даже из такой неловкой ситуации есть выход — Future. Это совсем не те Future, к которым привыкли в java, а концентрированное функциональное добро прямиком из scala. Сначала о Future в java. Это объект, который может содержать результат выполнения асинхронной операции. Или нет. Главное отличие этого фьючера — колбэк, возможность совершить некоторые действия после завершения задачи, причём абсолютно асинхронно. Именно в этих фьючерах заключается львиная доля асинхронной «мощи» фреймворка. Как это выглядит?
При создании мы помимо Callable (аналог Runnable с возвращаемым значением) передаём некий dispatcher. Это то, что выполняет актор. Очевидное замечание — если мы заставляем контекст актора выполнить блокирующее действие — а не повиснет ли оно? Тут дело в хитрой акке, которая имеет два пула потоков, один — для акторов, который нельзя блокировать, второй — для всякой ерунды, вроде Future, у этого пула потоков переменный размер. Они оба ограничены, но вызывая блокирующую операцию таким образом мы практически (в рамках разумного) не рискуем замедлить основной набор акторов.
Вернувшись к коду выше — видны эти невероятные Callback. На автора они произвели неизгладимое впечатление. Те, кто знакомы с функциональными языками могут заметить, что Future ведёт себя как монада (ей и является). Значит, можно устраивать их композиции, частично применять функции — всё это есть, модуль Futures.*
Но даже если не вдаваться в далёкие от классической явы вещи — это возможность без создания потоков, продумывания архитектуры, просто создавать асинхронные операции посреди кода. А ещё их можно вкладывать, организуя цепочки логики вида «прочитай файл, спроси пользователя и в это же время обрабатывай данные, в зависимости от ответа сохрани результат». И это быстро, просто, компактно.
Конечно, и тут не без ложки дёгтя — отладка. Код реактивнее некуда, и executor акки всячески помогает вам — исключительные ситуации внутри фьючера не выводятся. Совсем. Если там что-то случилось — вы это узнаете только по отсутствию вызова OnComplete, а что именно — вообще никак. Для более удобной отладки есть один грязный хак — recover. Акка известна своей failsafe политикой, фьючеры — не исключения, поэтому они имеют штатные средства для устранения некорректного результата future'а. Как это выглядит:
Видно, что в данном примере recover ошибку не исправляет, возвращая null в onComplete, зато хотя бы пишет о ней. Как итог — ясно, что Future являются невероятно мощным инструментом, позволяющий реализовывать асинхронные ветвления прямо посреди синхронного кода и не боятся блокировок, но требуют некоторой осторожности.
Для мира акторов есть полезный и распространённый шаблон на основании Future — ask.
Он отправляет сообщение Object актору ActorRef/ActorSelection и ждёт timeout ответа. Если не дождался — будет эксепшин, дождался — вызван метод OnComplete. После выполнения временный актор удаляется. Future по прежнему не использует пул потоков для акторов, так что блокирующих операций можно не опасаться. Как и обычные future ask'и могут будут неограниченно вложенными. Подобный шаблон позволяет не плодить лишних акторов для выполнения простых задач и организовывать event-driven последовательности действий.
Другим полезным производным от Future является агент — переносчик асинхронной смены состояния.
Агент — сериализуемый объект, все копии которого связаны последовательной шиной данных. Состояние каждого экземпляра агента доступно мгновенно, и, вместе с тем, есть возможность асинхронно обновить состояние всех агентов. Как это работает:
Создадим агент
Можно его послать в виде сообщения, даже передать на другую машину (об этом позже, немного терпения). В любой момент времени из него можно извлечь экземпляр Config'а методом .get(). Но самое полезное свойство — метод send(config) асинхронно передаст аргумент в качестве состояния всем экземплярам агента, независимо от их местоположения. На этом основные инструменты организации базовой архитектуры закончились, самое время переходить в сеть.
Если нырнуть не очень глубоко под капот (достаточно посмотреть список библиотек в поставке) становится ясно, что сеть работает на netty. Но в фреймворке приходится иметь дело с куда большим уровнем абстракции.
Рассмотрим реальный пример разбора входящих данных на вполне реальном примере — разбор произвольных TCP пакетов. Хорошим тоном считается разделять разбор пакетов от их логики — поэтому нам понадобится кодек и хэндлер. Сырые данные в виде ByteString (об этом дальше) приходят в актор-кодек, где из них формируются готовые пакеты и отправляются в актор-хэндлер, где с ними творится логика. Создадим актор, принимающий входящие соединения и создающий для каждого цепочку из кодека + хэндлера.
Теперь рассмотрим логику кодека, логика хэндлера тривиальна и зависит от того, какие данные вы именно разбираете и что именно требуется.
Собственно, опять получился некий «дзен-код», который похож на настоящий но ничего не делает. Мы получили входящие данные, дописали их к предыдущим. Очевидны вопрос — хорошо, входящий поток байт складывается в буфер, как же извлечь оттуда реальные строки, числа и массивы. Акка не знает тип данных, которые вы будете передавать, поэтому методы работы придётся писать самим. Например так
Вот и всё, выглядит достаточно… низкоуровнево, так как рассматривал самый общий случай. Аналогично примеру выше можно написать чтение из ByteString данных произвольной структуры, показано только самое простое. Если нужно отправить данные клиенту:
Вот так, в общих чертах, работает сеть в akka. Когда соединение закрывается — в кодек приходит сообщение Tcp.PeerClosed$. Акторы в таком варианте реализации не погибнут после закрытия соединения, поэтому их нужно убить, отправив сообщение-убийцу (после его получения актор завершается) PoisonPill.
Вот и добрались до основного преимущества и конька фреймворка — эффективной кластеризации из коробки, без каких-либо усилий со стороны программиста. Но для этого нужно разобраться с адресацией акторов, о которой мы упоминали вскользь. Существует 3 типа адресации, с примерами:
Локальная относительная: «kernel/core/worker»;
Локальная абсолютная: "/learning2hard/user/kernel/core/worker" — адрес начинается с слеша;
Сетевая абсолютная: «akka.tcp://learning2hard@testhost.com:100500/user/kernel/core/worker».
Зная сетевой адрес можно отправить сообщение удалённому актору, если он входит в тот же кластер, что и текущая система акторов.
Кластеризация требует правки конфига, однако это можно сделать и без него, не покидая кода программы.
Хорошо видно, что используется некий модуль remote, о котором ничего не упоминалось — это именно то, что обеспечивает возможность коммуникации между акторами посредством сети, кластеризация — надстройка над remote, но мы на этом останавливаться не будем. Параметр «port=0» будет заменён на любой
На этом этапе кластер уже готов и может выполнять свои функции, о которых стоит рассказать подробнее.
Все важные события (подключение новых нод, отключение, проблемы с связью, метрика) доступны подписчикам шины кластера. Как это работает посмотрим на примере:
Класс таких шин есть в акке — EventBus. Удобные, но ничего уникального сравнимо с им подобными не делающие — 3 основных метода, publish, subscribe, unsubscribe.
Для балансировки нагрузки с каждой машины снимается метрика, данные о памяти, процессоре, загруженности сети. akka поддерживает несколько сборщиков информации, наиболее продвинутым и точным (и рекомендуемым разработчиками) является sigar. Чтобы akka начала его использовать — достаточно подключить в список импортируемых библиотек sigar.jar и добавить необходимые нативки, всё есть на гуглабельном сайте разработчиков. Сообщения метрики регулярно отправляются в шину кластера.
Как бы это странно не звучало — с теорией кластера всё. Безусловно, ещё есть много нюансов, роли ноды, сообщения о достижении определённого размера, циклы жизни нод (и циклы жизни акторов, этот вопрос мы вообще не рассматривали), но с ними можно ознакомится самостоятельно, тем более, что для начала они не критически необходимы. В качестве примера работы с шиной рассмотрим типичную ситуацию — отправка работы вновь подключившимся нодам.
Хорошо видно, как адресуются удалённые ноды, есть нечто новое — роль, в данном случае — «lazyslave» это просто параметр конкретной ноды, точнее — список строк, задаётся в конфиге akka.cluster.roles.
Нельзя не упомянуть о специфических роутерах для кластеров. Они фундаментально ничем не отличаются от вышеописанных роутеров, кроме того, что размещают (или ищут) акторы не только на локальной машине, а на всех (или с определённой ролью) нодах кластера. Типичный пример:
Вот и всё. Акка это хороший, производительный фреймворк для параллельных вычислений, я бы его назвал лучшим в своём классе, при этом обладающий низким порогом вхождения (сравнимо с «ручным» параллелизмом, синхронизациями, мьютексами, условиями гонки и прочими чудесами).
Вопросы, которые показались автору интересными и/или нетривиальными были рассмотрены, как и основные моменты использования данного фреймворка. В качестве послесловия добавлю небольшой список ответов на возникающие вопросы (или те, которые мне задавали ранее). Буду рад дополнением и пожеланиям, если что-то из не упомянутого интересует — допишу.
— Было упомянуто о failsafe, однако нигде, кроме «молчащих» о эксепшинах future'ов это не встречалось. В чём дело ?
Стратегии поведения акторов в случае возникновения эксепшина намеренно не рассматривались, экономя время читателя. В двух словах — у актора есть стратегия супервайзинга, что он будет делать, если кто-то из его дочерних акторов бросит exception. Их 4 основных: убить дочерний актор, перезапустить дочерний актор, ничего не делать, бросить эксепшин самому. Если интереснее подробнее — на сайт фреймворка.
— Предлагалось создавать даже целые цепочки из Future'ов, полученных посредством Patterns.ask, которые для каждого экземпляра создают временный актор, а потом он уничтожается — насколько это ресурсозатратно? Ведь немногим ранее способ, когда для задачи создавались акторы и уничтожались после неё, назван неэффективным.
Акторы акки очень легки и расходуют мало ресурсов, результаты бенчей можно нагуглить. А временные акторы, создаваемые ask'ом — расходуют их ещё менее, устранясь сразу после окончания таймаута. Они легче, потом как не несут стратегий супервайзинга, их жизненный цикл не контролируется и всё такое. Так что избыточного создания акторов стоит избегать, но не опасаться.
— Какое оптимальное количество акторов в системе?
Какое нужно. Для бизнес-решений предлагается создавать актор на каждый набор хранимых состояний, и иногда их количество переваливает за 4-5 млн/ноду. И ничего, всё успешно работает.
— Насколько производителен поиск акторов по имени?
Он хорошо оптимизирован и использует чёрно-красные деревья, так что вполне производителен. Так же очевидно то, что по относительным путям скорость поиска значительно выше.
Стоит отметить, что акторы с поддержкой лямбда-счисления являются экспериментальными и не рекомендуются для использования в проектах.
Лямбда-акторы могут использоваться совместно с обычными, фундаментальное отличие — наследуемый класс, в данном случае он AbstractActor. Аргумент метода receive имеет тип PartialFunction<Object,BoxedUnit>, показывая, что и как актор может обрабатыват. Но не стоит опасаться громоздких типов, для этого есть конструктор частичной функции ReceiveBuilder. Типичное объявление актора выглядит как
Знакомится будем на примере фреймворка akka используя язык java (сам akka написан на scala).
Теория и принципы
Актор — это изолированный (нет прямых ссылок на экземпляр) объект, занимающийся асинхронной обработкой входящих сообщений. Сообщение — любой неизменяемый объект реализующий интерфейс Serializable. Они складируются в очередь, и когда система передаёт управление актору (об этом далее) — объекты извлекаются из очереди по принципу FIFO и обрабатываются согласно внутренней логике, которую мы должны явно задать. Акторы образуют древовидную иерархию, каждый из них имеет актора (или систему акторов) в качестве создателя.
Чтобы отправить сообщение актору нужно иметь на него ссылку (не путать ссылкой на объект). Она бывает двух видов — ActorRef и ActorSelection. Ссылка имеет ключевой метод .tell(message, sender) отправляющий сообщение по указанному пути. Sender — это тоже ссылка типа ActorRef, именно её актор получатель получит при вызове метода getSender() при обработке сообщения.
ActorRef — ссылка на конкретный, гарантированно существовавший (на момент создания ссылки)актор, аналог StrongReference. В явном виде ActorRef можно получить, вызвав метод getSender() при обработке сообщений, получив прямую ссылку на отправителя, или же при создании дочерних акторов. Для гарантированной доставки сообщений используется сообщение-ответ или агент, об этом далее.
ActorSelection — слабая ссылка, существенным отличием которой является возможность создать ActorSelection зная лишь имя и путь к актору, но обратной стороной медали является отсутствие гарантий того, что по указанному пути хоть что-то существует. Отправить сообщение можно, но его доставку никто не гарантирует. Поэтому если нужно отправить данные с гарантией, но не имея прямой ссылки на нужный актор, ему отправляется произвольное сообщение, из ответа на которое можно взять сильную ссылку ActorRef.
Собственно, этот базовый минимум, после которого можно продолжать изучение уже на практике. Подключаем библиотеки, их в стандартной комплектации акки достаточно много, дабы избежать ручного разрешения зависимостей проще подключить сразу все (или использовать maven/etc).
Немного практики
Асинхронный мир начинает с системы акторов, создадим её. Обычно нет необходимости иметь более одной системы акторов в приложении.
ActorSystem system = ActorSystem.create("learning2hard");
Система имеет множество настроек, которые нужно выставить до её запуска (а она активна сразу после создания). Это настраивается в внешнем config файле или передаётся вторым аргументом методу-конструктору, о котором подробнее можно почитать на сайте разработчика, нам пока это не нужно, просто стоит знать.
Теперь следует создать актор ядра, дочерними к которому будет вся будущая логика. Технически — актор это объект, наследующий UntypedActor. Сделаем это.
public class Kernel extends UntypedActor {
@Override
public void onReceive(Object arg0) throws Exception {
unhandled(arg0);
}
}
Собственно, класс актора готов. Метод unhandled(arg0) помечает пришедший объект как необработанный и пишет об этом в лог. Добавим экземпляр kernel'а системе акторов:
final ActorRef kernel = system.actorOf(Props.create(Kernel.class), "kernel");
Мы создали по отношению к системе (она является корнем иерархического древа акторов) дочерний актор класса Kernel и дали ему название «kernel». В акке повсеместно используются строковые названия и пути. Если в конструктор Kernel'а необходимо передать какие-то объекты — это делается при помощи Props.create(Kernel.class, obj1, obj2, obj3...).
Сделали эталон программирования — программу, которая не делает ничего полезного, добавим функционал, например — обработку команд, читаемых из консоли. В контексте актора нельзя использовать блокирующие поток вызовы/операции, совсем (есть способ, как их таки можно туда засунуть, о загадочных монадах Future немного позже), а ожидание ввода команды в консоль — одна из таких. Поэтому целесообразно всячески выносить из мира акторов и избегать блокирующих вызовов, применим первое — будем вне системы акторов читать команды с консоли и отправлять их в виде сообщений ядру.
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
String f = sc.nextLine();
if (f.equals("exit")) break;
kernel.tell(f, ActorRef.noSender());
}
system.shutdown();
Единственное, что стоит отметить — мы посылаем из «внешнего мира» сообщение в мир акторов, ссылки на отправляющего, ясное дело, нет, поэтому применяется заглушка.
Теперь классу kernel нужно дописать обработку пришедшей строки, сделаем простое эхо.
@Override
public void onReceive(Object arg0) throws Exception {
if (arg0 instanceof String) {
String s = (String) arg0;
System.out.println(s);
return;
}
unhandled(arg0);
}
Примерно так выглядит логика любого актора, список блоков с условием instanceof и return при окончании + unhandled(arg0) после всех блоков, авось что-то не обработалось. Мы в качестве sender'а использовали ActorRef.noSender(), теперь самое время посмотреть, как выглядит адрес:
System.out.println("Адрес отправителя: " + getSender());
Получаем
Actor[akka://learning2hard/deadLetters]
И видим, что адрес существует, а не какой-нибудь там null, указывает на заглушку корзины, которая принимает объекты и ничего с ними не делает. Такой тип адреса называется локальным и абсолютным, он верный только в рамках данной системы акторов. Подробнее о адресах ниже.
Но один в поле не воин, создадим ещё акторов, и придумаем простую распределённую задачу — проверка числа на простоту. Чтобы жизнь мёдом не казалась — будем их перебирать, а чтобы акторы не ленились — без оптимизаций. Вводится с консоли число, отправляется ядру, оно должно оценить количество делителей, разослать работу рабочим, где каждый перебирает свой диапазон делителей, и когда хотя бы один заявит, что число составное — написать результат, если все закончили свою работу и не нашли ничего — число простое.
Написать проще чем рассказать, сделаем работника с прозрачной логикой.
public class PrimeWorker extends UntypedActor {
@Override
public void onReceive(Object arg0) throws Exception {
if (arg0 instanceof Job) {
Job task = (Job) arg0;
for (int i = task.from; i < task.to; i++)
if (task.number % i == 0) { //Число составное
getSender().tell(new JobResult(task.jobID,false), getSelf());
return;
}
getSender().tell(new JobResult(task.jobID,true), getSelf());
}
unhandled(arg0);
}
public static class Job implements Serializable {
private static final long serialVersionUID = 5095931000566324969L;
public final int jobID; //Идентификатор задачи, используется для аггрегации результатов
public final int number; //Исследуемое число
public final int from; //Нижняя граница диапазона перебора
public final int to; //Верхняя граница диапазона перебора
public Job(int jobID, int number, int from, int to) {
this.jobID = jobID; this.number = number; this.from = from; this.to = to;
}
}
public static class JobResult implements Serializable {
private static final long serialVersionUID = -1788069759380966076L;
public final int jobID;
public final boolean isPrime;
public JobResult(int jobID, boolean isPrime) {
this.jobID = jobID; this.isPrime = isPrime;
}
}
}
Ничего сложного быть не должно. Теперь займёмся кодом ядра, оно выполняет две функции — распределяет работу и объединяет результаты
public class Kernel extends UntypedActor {
//work ID -> (currentWorkerCount, isPrime)
private TreeMap<Integer, Pair<Integer, Boolean>> jobs = new TreeMap<Integer, Pair<Integer, Boolean>>();
private int job_id_counter = 0;
@Override
public void onReceive(Object arg0) throws Exception {
if (arg0 instanceof String) {
int i = Integer.valueOf((String) arg0);
for (int j = 2; j < i; j++)
getContext().actorOf(Props.create(PrimeWorker.class)).tell(new PrimeWorker.Job(job_id_counter, i, j, j+1), getSelf());
jobs.put(job_id_counter, Pair.get(i - 2, true));
job_id_counter++;
return;
}
if (arg0 instanceof JobResult) {
JobResult jr = (JobResult) arg0;
Pair<Integer, Boolean> task = jobs.get(jr.jobID);
if (!jr.isPrime) task.second = false;
task.first--;
if (task.first < 1){
System.out.println("Число " + jr.number + (task.second ? " простое" : " составное"));
jobs.remove(jr.jobID);
}
getSender().tell(PoisonPill.getInstance(), getSelf()); //Актор сделал свою работу, отправляем ему команду уничтожения
}
unhandled(arg0);
}
public static class Pair<A,B> {
public A first;
public B second;
public static <C,D> Pair<C,D> get(C a, D b) {
Pair<C,D> p = new Pair();
p.first = a;
p.second = b;
return p;
}
}
}
Не останавливаемся на отсутствии проверок, не это цель. Сразу в глаза бросаются некоторая расточительность — мы создаем акторы для одной операции и потом их уничтожаем. Конечно, можно поместить ссылки на них в массив, и по очереди вызывать оттуда, но, к счастью, всё уже сделано. Попробуем улучшить код применяя готовые решения.
Роутер — специализированный объект, передающий входящие сообщения акторам, используя определённую стратегию их выбора. Бывает двух типов — пул и группа. Группа — выбирает акторы согласно стратегии по указанному пути, пул — создает их сам. Роутер редко используется как самостоятельный объект, зачастую он инкапсулируется в актор, именно с такими роутерами-акторами мы будем иметь дело. Стратегий выбора существует много, о них можно почитать на сайте авторов фреймворка, наиболее универсальные это SmallestMailboxRouter(SM) и BroadcastRouter. Первый выбирает наименее загруженный актор из набора (по размеру мэилбокса), второй — рассылает сообщение всем. Объявим роутер с пулом акторов и стратегией SM.
ActorRef router = getContext().actorOf(new SmallestMailboxPool(5).props(Props.create(PrimeWorker.class)), "workers");
В данном случае мы статически задаём размер пула, есть возможность сделать динамически изменяющиеся пулы, и даже написать свою логику изменения, но это выходит за рамки статьи.
Модифицируем код отправки заданий ядра (и убираем отправку сообщения для убиения актора):
if (arg0 instanceof String) {
int i = Integer.valueOf((String) arg0);
for (int j = 2; j < i; j++)
router.tell(new PrimeWorker.Job(job_id_counter, i, j, j+1), getSelf());
jobs.put(job_id_counter, Pair.get(i - 2, true));
job_id_counter++;
return;
}
Всё, готово. Теперь роутер сам позаботится о распределении задач. Базовые вещи продемонстрированы, можно переходить к более тонким моментам.
Контекст выполнения. Блокирующие операции. Передача состояния
Выше говорилось, что акторам передаётся управление, но не было уточнения — как. За это отвечает контекст выполнения, он явно указывается в файле конфигурации, по умолчанию стоит fork-join executor, универсальный и наиболее производительный на общих задачах. Создаётся некоторое количество потоков (неявно указано в конфиге) и они передают управление акторам, выбирая их в n потоков из списка. Критерием переключения к другому актору является или пустая очередь сообщений или обработка k сообщений подряд, если не указано другое. Очевидно, что происходит при попытке актора вызвать блокирующую операцию — поток блокируется, и остальные акторы начинают обрабатываться куда медленнее. А если количество заблокированных акторов становится = количеству потоков то система замирает.
Даже из такой неловкой ситуации есть выход — Future. Это совсем не те Future, к которым привыкли в java, а концентрированное функциональное добро прямиком из scala. Сначала о Future в java. Это объект, который может содержать результат выполнения асинхронной операции. Или нет. Главное отличие этого фьючера — колбэк, возможность совершить некоторые действия после завершения задачи, причём абсолютно асинхронно. Именно в этих фьючерах заключается львиная доля асинхронной «мощи» фреймворка. Как это выглядит?
Future<String> f = future(new Callable<String>() {
@Override
public String call() throws Exception {
//Some blocking stuff
return "hello habr";
}
}, getContext().dispatcher());
f.onComplete(new OnComplete<String>(){
@Override
public void onComplete(Throwable arg0, String arg1) throws Throwable {
System.out.println(arg0 != null ? arg0 : arg1);
}
}, getContext().dispatcher());
При создании мы помимо Callable (аналог Runnable с возвращаемым значением) передаём некий dispatcher. Это то, что выполняет актор. Очевидное замечание — если мы заставляем контекст актора выполнить блокирующее действие — а не повиснет ли оно? Тут дело в хитрой акке, которая имеет два пула потоков, один — для акторов, который нельзя блокировать, второй — для всякой ерунды, вроде Future, у этого пула потоков переменный размер. Они оба ограничены, но вызывая блокирующую операцию таким образом мы практически (в рамках разумного) не рискуем замедлить основной набор акторов.
Вернувшись к коду выше — видны эти невероятные Callback. На автора они произвели неизгладимое впечатление. Те, кто знакомы с функциональными языками могут заметить, что Future ведёт себя как монада (ей и является). Значит, можно устраивать их композиции, частично применять функции — всё это есть, модуль Futures.*
Но даже если не вдаваться в далёкие от классической явы вещи — это возможность без создания потоков, продумывания архитектуры, просто создавать асинхронные операции посреди кода. А ещё их можно вкладывать, организуя цепочки логики вида «прочитай файл, спроси пользователя и в это же время обрабатывай данные, в зависимости от ответа сохрани результат». И это быстро, просто, компактно.
Конечно, и тут не без ложки дёгтя — отладка. Код реактивнее некуда, и executor акки всячески помогает вам — исключительные ситуации внутри фьючера не выводятся. Совсем. Если там что-то случилось — вы это узнаете только по отсутствию вызова OnComplete, а что именно — вообще никак. Для более удобной отладки есть один грязный хак — recover. Акка известна своей failsafe политикой, фьючеры — не исключения, поэтому они имеют штатные средства для устранения некорректного результата future'а. Как это выглядит:
Future<String> f = future(new Callable<String>() {
@Override
public String call() throws Exception {
//Some blocking stuff
return "hello habr";
}
}, getContext().dispatcher());
f.recover(new Recover<String>() {
@Override
public String recover(Throwable arg0) throws Throwable {
arg0.printStackTrace();
return null;
}
}, getContext().dispatcher());
f.onComplete(new OnComplete<String>(){
@Override
public void onComplete(Throwable arg0, String arg1) throws Throwable {
System.out.println(arg0 != null ? arg0 : arg1);
}
}, getContext().dispatcher());
Видно, что в данном примере recover ошибку не исправляет, возвращая null в onComplete, зато хотя бы пишет о ней. Как итог — ясно, что Future являются невероятно мощным инструментом, позволяющий реализовывать асинхронные ветвления прямо посреди синхронного кода и не боятся блокировок, но требуют некоторой осторожности.
Для мира акторов есть полезный и распространённый шаблон на основании Future — ask.
Future<Object> ask = Patterns.ask(ActorRef/ActorSelection, Object, timeout)
Он отправляет сообщение Object актору ActorRef/ActorSelection и ждёт timeout ответа. Если не дождался — будет эксепшин, дождался — вызван метод OnComplete. После выполнения временный актор удаляется. Future по прежнему не использует пул потоков для акторов, так что блокирующих операций можно не опасаться. Как и обычные future ask'и могут будут неограниченно вложенными. Подобный шаблон позволяет не плодить лишних акторов для выполнения простых задач и организовывать event-driven последовательности действий.
Другим полезным производным от Future является агент — переносчик асинхронной смены состояния.
Агент — сериализуемый объект, все копии которого связаны последовательной шиной данных. Состояние каждого экземпляра агента доступно мгновенно, и, вместе с тем, есть возможность асинхронно обновить состояние всех агентов. Как это работает:
Создадим агент
Agent<Config> agent = Agent.create(new Config(), getContext())
Можно его послать в виде сообщения, даже передать на другую машину (об этом позже, немного терпения). В любой момент времени из него можно извлечь экземпляр Config'а методом .get(). Но самое полезное свойство — метод send(config) асинхронно передаст аргумент в качестве состояния всем экземплярам агента, независимо от их местоположения. На этом основные инструменты организации базовой архитектуры закончились, самое время переходить в сеть.
TCP/IP
Если нырнуть не очень глубоко под капот (достаточно посмотреть список библиотек в поставке) становится ясно, что сеть работает на netty. Но в фреймворке приходится иметь дело с куда большим уровнем абстракции.
Рассмотрим реальный пример разбора входящих данных на вполне реальном примере — разбор произвольных TCP пакетов. Хорошим тоном считается разделять разбор пакетов от их логики — поэтому нам понадобится кодек и хэндлер. Сырые данные в виде ByteString (об этом дальше) приходят в актор-кодек, где из них формируются готовые пакеты и отправляются в актор-хэндлер, где с ними творится логика. Создадим актор, принимающий входящие соединения и создающий для каждого цепочку из кодека + хэндлера.
public class TCPListener extends UntypedActor {
private ActorRef net;
@Override
public void preStart() throws Exception {
net = Tcp.get(getContext().system()).manager();
net.tell(TcpMessage.bind(getSelf(), InetSocketAddress.createUnresolved("127.0.0.1", 90), 100), getSelf());
}
@Override
public void onReceive(Object arg0) throws Exception {
if (arg0 instanceof Connected) {
Connected msg = (Connected) arg0;
ActorRef handler = getContext().actorOf(Props.create(PacketHandler.class));
ActorRef codec = getContext().actorOf(Props.create(TCPCodec.class, getSender(), handler)); //Актор кодек принимает аргументом констурктора хэндлер
getSender().tell(TcpMessage.register(codec), getSelf()); //Сырые данные пойдут в кодек
net.tell(msg, getSelf());
}
unhandled(arg0);
}
}
Теперь рассмотрим логику кодека, логика хэндлера тривиальна и зависит от того, какие данные вы именно разбираете и что именно требуется.
public class TCPCodec extends UntypedActor {
private ActorRef connection, target;
private ByteString buffer = ByteStrings.empty();
public TCPCodec(ActorRef conn, ActorRef target) {
this.connection = conn;
this.target = target;
}
@Override
public void onReceive(Object arg0) throws Exception {
if (arg0 instanceof Received) {
buffer = buffer.concat(decrypt(((Received) arg0).data()));
return;
}
}
Собственно, опять получился некий «дзен-код», который похож на настоящий но ничего не делает. Мы получили входящие данные, дописали их к предыдущим. Очевидны вопрос — хорошо, входящий поток байт складывается в буфер, как же извлечь оттуда реальные строки, числа и массивы. Акка не знает тип данных, которые вы будете передавать, поэтому методы работы придётся писать самим. Например так
public class ReadableProtocolBuffer implements DataInput {
public static final Charset CHARSET = StandardCharsets.UTF_8;
private ByteString string;
private int index;
public ReadableProtocolBuffer(ByteString string) {
this.string = string;
this.index = 0;
}
@Override
public void readFully(byte[] b) {
readFully(b, 0, b.length);
}
@Override
public boolean readBoolean() {
return readByte() != 0;
}
@Override
public byte readByte() {
return string.apply(index++);
}
@Override
public int readUnsignedByte() {
return readByte() & 0xFF;
}
@Override
public short readShort() {
return (short) readUnsignedShort();
}
@Override
public int readUnsignedShort() {
int h = readUnsignedByte(),
l = readUnsignedByte();
return (h<<8)+l;
}
Вот и всё, выглядит достаточно… низкоуровнево, так как рассматривал самый общий случай. Аналогично примеру выше можно написать чтение из ByteString данных произвольной структуры, показано только самое простое. Если нужно отправить данные клиенту:
connection.tell(TcpMessage.write(ByteString data, getSelf());
Вот так, в общих чертах, работает сеть в akka. Когда соединение закрывается — в кодек приходит сообщение Tcp.PeerClosed$. Акторы в таком варианте реализации не погибнут после закрытия соединения, поэтому их нужно убить, отправив сообщение-убийцу (после его получения актор завершается) PoisonPill.
Кластеризация и адресация
Вот и добрались до основного преимущества и конька фреймворка — эффективной кластеризации из коробки, без каких-либо усилий со стороны программиста. Но для этого нужно разобраться с адресацией акторов, о которой мы упоминали вскользь. Существует 3 типа адресации, с примерами:
Локальная относительная: «kernel/core/worker»;
Локальная абсолютная: "/learning2hard/user/kernel/core/worker" — адрес начинается с слеша;
Сетевая абсолютная: «akka.tcp://learning2hard@testhost.com:100500/user/kernel/core/worker».
Зная сетевой адрес можно отправить сообщение удалённому актору, если он входит в тот же кластер, что и текущая система акторов.
ActorSelection remove = getContext().actorSelection( "akka.tcp://learning2hard@testhost.com/user/kernel/core/worker");
Кластеризация требует правки конфига, однако это можно сделать и без него, не покидая кода программы.
ActorSystem system = ActorSystem.create("learning2hard", ConfigFactory.parseString(
"akka {\n" +
" actor.provider = \"akka.cluster.ClusterActorRefProvider\"\n"+
" remote.netty.tcp {\n"+
" hostname = \"127.0.0.1\"\n"+
" port = 0" +
" }\n"+
" cluster.seed-nodes = [\"akka.tcp://learning2hardt@testhost.com+\"]\n"+
"}"));
Хорошо видно, что используется некий модуль remote, о котором ничего не упоминалось — это именно то, что обеспечивает возможность коммуникации между акторами посредством сети, кластеризация — надстройка над remote, но мы на этом останавливаться не будем. Параметр «port=0» будет заменён на любой
На этом этапе кластер уже готов и может выполнять свои функции, о которых стоит рассказать подробнее.
Все важные события (подключение новых нод, отключение, проблемы с связью, метрика) доступны подписчикам шины кластера. Как это работает посмотрим на примере:
Cluster cluster = Cluster.get(getContext().system()); //инициализировали шину
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class); //подписались на получение интересующих событий
Класс таких шин есть в акке — EventBus. Удобные, но ничего уникального сравнимо с им подобными не делающие — 3 основных метода, publish, subscribe, unsubscribe.
Для балансировки нагрузки с каждой машины снимается метрика, данные о памяти, процессоре, загруженности сети. akka поддерживает несколько сборщиков информации, наиболее продвинутым и точным (и рекомендуемым разработчиками) является sigar. Чтобы akka начала его использовать — достаточно подключить в список импортируемых библиотек sigar.jar и добавить необходимые нативки, всё есть на гуглабельном сайте разработчиков. Сообщения метрики регулярно отправляются в шину кластера.
Как бы это странно не звучало — с теорией кластера всё. Безусловно, ещё есть много нюансов, роли ноды, сообщения о достижении определённого размера, циклы жизни нод (и циклы жизни акторов, этот вопрос мы вообще не рассматривали), но с ними можно ознакомится самостоятельно, тем более, что для начала они не критически необходимы. В качестве примера работы с шиной рассмотрим типичную ситуацию — отправка работы вновь подключившимся нодам.
Cluster cluster = Cluster.get(getContext().system());
cluster.subscribe(getSelf(), MemberUp.class);
@Override
public void onReceive(Object message) {
if (message instanceof MemberUp) {
if (member.hasRole("lazyslave")) getContext().actorSelection(member.address() + "/user/kernel").tell("hey, slave", getSelf());
return;
}
unhandled(message);
}
Хорошо видно, как адресуются удалённые ноды, есть нечто новое — роль, в данном случае — «lazyslave» это просто параметр конкретной ноды, точнее — список строк, задаётся в конфиге akka.cluster.roles.
Нельзя не упомянуть о специфических роутерах для кластеров. Они фундаментально ничем не отличаются от вышеописанных роутеров, кроме того, что размещают (или ищут) акторы не только на локальной машине, а на всех (или с определённой ролью) нодах кластера. Типичный пример:
int totalInstances = 100;
int maxInstancesPerNode = 3;
boolean allowLocalRoutees = false;
String useRole = "compute";
ActorRef workerRouter = getContext().actorOf(new ClusterRouterPool(new ConsistentHashingPool(0),
new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode,
allowLocalRoutees, useRole)).props(Props.create(Worker.class)), "workerRouter");
Вот и всё. Акка это хороший, производительный фреймворк для параллельных вычислений, я бы его назвал лучшим в своём классе, при этом обладающий низким порогом вхождения (сравнимо с «ручным» параллелизмом, синхронизациями, мьютексами, условиями гонки и прочими чудесами).
Вопросы, которые показались автору интересными и/или нетривиальными были рассмотрены, как и основные моменты использования данного фреймворка. В качестве послесловия добавлю небольшой список ответов на возникающие вопросы (или те, которые мне задавали ранее). Буду рад дополнением и пожеланиям, если что-то из не упомянутого интересует — допишу.
Вопросник
— Было упомянуто о failsafe, однако нигде, кроме «молчащих» о эксепшинах future'ов это не встречалось. В чём дело ?
Стратегии поведения акторов в случае возникновения эксепшина намеренно не рассматривались, экономя время читателя. В двух словах — у актора есть стратегия супервайзинга, что он будет делать, если кто-то из его дочерних акторов бросит exception. Их 4 основных: убить дочерний актор, перезапустить дочерний актор, ничего не делать, бросить эксепшин самому. Если интереснее подробнее — на сайт фреймворка.
— Предлагалось создавать даже целые цепочки из Future'ов, полученных посредством Patterns.ask, которые для каждого экземпляра создают временный актор, а потом он уничтожается — насколько это ресурсозатратно? Ведь немногим ранее способ, когда для задачи создавались акторы и уничтожались после неё, назван неэффективным.
Акторы акки очень легки и расходуют мало ресурсов, результаты бенчей можно нагуглить. А временные акторы, создаваемые ask'ом — расходуют их ещё менее, устранясь сразу после окончания таймаута. Они легче, потом как не несут стратегий супервайзинга, их жизненный цикл не контролируется и всё такое. Так что избыточного создания акторов стоит избегать, но не опасаться.
— Какое оптимальное количество акторов в системе?
Какое нужно. Для бизнес-решений предлагается создавать актор на каждый набор хранимых состояний, и иногда их количество переваливает за 4-5 млн/ноду. И ничего, всё успешно работает.
— Насколько производителен поиск акторов по имени?
Он хорошо оптимизирован и использует чёрно-красные деревья, так что вполне производителен. Так же очевидно то, что по относительным путям скорость поиска значительно выше.
Акторы и java 8
Стоит отметить, что акторы с поддержкой лямбда-счисления являются экспериментальными и не рекомендуются для использования в проектах.
Лямбда-акторы могут использоваться совместно с обычными, фундаментальное отличие — наследуемый класс, в данном случае он AbstractActor. Аргумент метода receive имеет тип PartialFunction<Object,BoxedUnit>, показывая, что и как актор может обрабатыват. Но не стоит опасаться громоздких типов, для этого есть конструктор частичной функции ReceiveBuilder. Типичное объявление актора выглядит как
public class EchoActor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(context().system(), this);
public EchoActor() {
receive(ReceiveBuilder.
match(String.class, s -> {
log.info("Echo string: {}", s);
}).
match(Integer.class, i -> {
log.info("Input integer: {}",i);
}).
matchAny(o -> log.info("Unknown messsage")).build()
);
}
Это обеспечивает более удобный (если привыкнуть) синтаксис, но, как известно, лямбда счисление в яве плохо оптимизированно и является (если лезть под капот) синтаксическим сахаром, что негативно сказывается на производительности, <a href="http://doc.akka.io/docs/akka/2.3.1/java/lambda-actors.html#Lambdas_and_Performance">о чём предупреждают разработчики акки</a>.
}