В первой статье мы рассказали о том, что такое SObjectizer и почему он получился именно таким. Во второй – попробуем показать, как может выглядеть более-менее реальный код на SObjectizer. С демонстрацией того, в какую сторону этот код обычно эволюционирует. Ибо первоначально, когда у разработчика появляется возможность работать с Actor Model, он начинает этой возможностью злоупотреблять, создавая проблемы и себе, и тем, кто будет эксплуатировать программный продукт, написанный в стиле «актор на каждый чих». Только спустя некоторое время и некоторое количество набитых шишек приходит понимание того, что прелесть модели акторов вовсе не в возможности создавать их десятками тысяч или даже просто тысячами. Но давайте пойдем последовательно, не опережая события.
Для демонстрации выдумаем себе такую абстрактную задачу: есть имя файла с email-ом (грубо говоря, в этот файл сохранили все, что пришло по POP3 протоколу, включая заголовки, тело письма, аттачи и пр). Нужно выдать результат оценки подозрительности содержимого этого файла: выглядит ли письмо безопасно или подозрительно, или же при попытке оценить его содержимое возникла какая-то проблема и актуальную оценку выдать нельзя. Задача абстрактная, любые совпадения с чем-либо похожим из реальной жизни являются непреднамеренной случайностью.
Естественно, таких имен с файлами email-ов у нас будет не одно и не два. Будет некий поток этих имен, с которым нужно разбираться. Желательно, используя возможности современного многоядерного железа, т. е. запуская обработку нескольких email-ов в параллель.
Схематично покажем, как эта задача может быть решена на SObjectizer-е «в лоб». После чего укажем проблемы выбранного подхода, сделаем следующую итерацию и т.д. Дабы в итоге на примерах подвести читателя к тому пониманию «удобного использования модели акторов на C++» которое у нас сложилось за десять лет работы с SObjectizer-ом в реальных проектах.
Для начала определимся с тем, как выдаются запросы на проверку файлов с email-ами и как возвращаются результаты проверок. Используем для этих целей простые сообщения:
// Избавляемся от необходимости указывать префиксы so_5 и std. // В последующих примерах эти using-и дублировать не будем, // подразумевая, что они уже выполнены. using namespace so_5; using namespace std; using namespace chrono_literals; // Сообщение для проверки одного файла с email-ом. struct check_request { // Имя проверяемого файла. string email_file_; // Кому нужно отослать результат проверки. mbox_t reply_to_; }; // Статус проверки, который будет возвращен в ответном сообщении. enum class check_status { safe, suspicious, dangerous, check_failure, check_timedout }; // Сообщение с результатом проверки одного файла с email. // Содержит не только статус проверки, но и имя проверяемого файла. // Это имя нужно лишь для того, чтобы облегчить сопоставление // получаемых результатов проверки. struct check_result { string email_file_; check_status status_; };
Получается, что когда нам нужно проверить email, мы отсылает сообщение check_request на некий mbox. В этом сообщении передается имя файла и обратный адрес, куда должен быть отослан результат проверки. Соответственно, следующим шагом нам нужно определить, куда же именно будут отсылаться сообщения check_request.
Можно, конечно же, создать одного агента, который бы получал все сообщения check_request и обрабатывал бы их самостоятельно. Но такой агент очень быстро стал бы узким местом. Поэтому мы сделаем так, что у нас будет один агент-менеджер, который получает сообщения check_request и под каждое полученное сообщение создает агента-анализатора. Именно агент-анализатор будет заниматься проверкой email-а, а агент-мендежер будет выполнять роль фабрики агентов-анализаторов.
Сходу можно написать самую простую версию агента-менеджера:
// Агент, который будет играть роль менеджера агентов email_analyzer. class analyzer_manager final : public agent_t { public : analyzer_manager( context_t ctx ) : agent_t( ctx ) { // Класс объявлен как final, поэтому подписки агента можно сделать // прямо в конструкторе. Если бы final не было, подписки лучше было // бы вынести в метод so_define_agent(), что упростило бы разработку // производных классов. so_subscribe_self() // В этом случае тип сообщения, на который идет подписка, // выводится автоматически. .event( &analyzer_manager::on_new_check_request ); } private : void on_new_check_request( const check_request & msg ) { // Создаем кооперацию с единственным агентом внутри. // Эта кооперация будет дочерней для кооперации с агентом-менеджером. // Т.е. SObjectizer Environment проконтролирует, чтобы кооперация с // агентом-анализатором завершила свою работу перед тем, как // завершит свою работу кооперация с агентом-менеджером. introduce_child_coop( *this, [&]( coop_t & coop ) { // В кооперацию будет входить всего один агент. coop.make_agent< email_analyzer >( msg.email_file_, msg.reply_to_ ); } ); } };
Для обработки email-ов нам нужно будет зарегистрировать в SObjectizer Environment экземпляр агента типа analyzer_manager и каким-то образом сделать его персональный mbox (т.н. direct_mbox) доступным для всех. Тот, кому нужно проверить email, отошлет на этот mbox сообщение check_request, сообщение дойдет до analyzer_manager, будет создан агент email_analyzer ну и дальше все, как и задумывалось...
Теперь нужно реализовать агента email_analyzer, который и будет производить анализ email-ов. Самое простое, что приходит в голову – это агент, который сам выполняет все операции: т.е. загружает содержимое из файла, парсит это содержимое на составные части (заголовки, тело, аттачи), анализирует все это и выдает заключение.
Фактически, агенту email_analyzer нужно будет определить только свою реализацию метода so_evt_start(), которая автоматически вызывается у каждого агента после того, как агент успешно регистрируется внутри SObjectizer Environment. Посему агент email_analyzer будет выглядеть очень просто:
// Агент для анализа содержимого одного email-а. // Получает все нужные ему параметры в конструкторе, // выполняет все свои действия в единственном методе so_evt_start. class email_analyzer : public agent_t { public : email_analyzer( context_t ctx, // Имя файла с email для анализа. string email_file, // Куда нужно отослать результат анализа. mbox_t reply_to ) : agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to)) {} virtual void so_evt_start() override { try { // Стадии обработки обозначаем лишь схематично. auto raw_data = load_email_from_file( email_file_ ); auto parsed_data = parse_email( raw_data ); auto status = check_headers( parsed_data->headers() ); if( check_status::safe == status ) status = check_body( parsed_data->body() ); if( check_status::safe == status ) status = check_attachments( parsed_data->attachments() ); send< check_result >( reply_to_, email_file_, status ); } catch( const exception & ) { // В случае какой-либо ошибки отсылаем статус о невозможности // проверки файла с email-ом по техническим причинам. send< check_result >( reply_to_, email_file_, check_status::check_failure ); } // Больше мы не нужны, поэтому дерегистрируем кооперацию, // в которой находимся. so_deregister_agent_coop_normally(); } private : const string email_file_; const mbox_t reply_to_; };
Итак, у нас есть очень тривиальные реализации агентов analyzer_manager и email_analyzer. Которые, к сожалению, имеют несколько серьезных проблем.
Первая проблема состоит в том, что агенты email_analyzer не будут работать в параллель. Дело в том, что при их создании не указывается диспетчер, к которому они должны быть привязаны. Поэтому эти агенты автоматически привязываются к дефолтному диспетчеру SObjectizer Environment, а этот дефолтный диспетчер является однопоточным: т.е. у него всего одна рабочая нить, на которой последовательно запускаются события привязанных к диспетчеру агентов.
Поэтому, если мы хотим, чтобы агенты email_analyzer работали независимо друг от друга, нам нужно явно привязывать их к соответствующему типу диспетчера. В данном случае хорошо подойдет диспетчер с пулом рабочих потоков. Соответственно, кто-то должен создать экземпляр такого диспетчера и кто-то должен привязывать email_analyzer-ов к этому экземпляру. Очевидно, что этот кто-то – это агент analyzer_manager:
class analyzer_manager final : public agent_t { public : analyzer_manager( context_t ctx ) : agent_t( ctx ) , analyzers_disp_( // Нужен приватный, т.е. видимый только нашему менеджеру // диспетчер, на котором и будут работать агенты-анализаторы. disp::thread_pool::create_private_disp( // Указываем, в рамках какого SObjectizer Environment // будет работать диспетчер. Это нужно для корректного запуска // и останова диспетчера. so_environment(), // Просто захардкодим количество рабочих потоков для диспетчера. // В реальном приложении это количество может быть вычислено // на основании, например, thread::hardware_concurrency() или // взято из конфигурации. 16 ) ) { so_subscribe_self() .event( &analyzer_manager::on_new_check_request ); } private : disp::thread_pool::private_dispatcher_handle_t analyzers_disp_; void on_new_check_request( const check_request & msg ) { introduce_child_coop( *this, // Агент из новой кооперации будет автоматически привязан к приватному // диспетчеру с пулом рабочих потоков (при привязке будут использоваться // параметры по умолчанию). analyzers_disp_->binder( disp::thread_pool::bind_params_t() ), [&]( coop_t & coop ) { // В кооперацию будет входить всего один агент. coop.make_agent< email_analyzer >( msg.email_file_, msg.reply_to_ ); } ); } };
Такая несложная модификация analyzer_manager позволила нам избавиться от первой проблемы. Но осталась еще и вторая: неконтролируемое создание неограниченного количества агентов email_analyzer.
Текущая реализация analyzer_manager работает по принципу: получил сообщение check_email с именем файла для проверки, создал агента email_analyzer и забыл про все. Но, очевидно, что для более-менее высокой нагрузки этот вариант не подходит. Если сразу создать 100500 агентов email_analyzer, которые будут работать на пуле из N рабочих потоков, то ничего хорошего кроме лишнего расхода памяти не будет. Лучше сразу ограничивать количество одновременно работающих агентов и создавать новых после того, как завершают работу предыдущие. Плюс хранить очередь заданий на обработку, из которой и будут браться элементы для новых агентов.
Поэтому еще раз модифицируем нашего analyzer_manager-а: добавим в него очередь запросов и ограничение на количество одновременно работающих агентов.
class analyzer_manager final : public agent_t { // Этот сигнал нам нужен для того, чтобы мы могли попробовать // запустить в работу очередной анализатор. struct try_create_next_analyzer : public signal_t {}; // А этот сигнал будет информировать нас о том, что очередной // анализатор завершил свою работу. struct analyzer_finished : public signal_t {}; public : analyzer_manager( context_t ctx ) : agent_t( ctx ) , analyzers_disp_( disp::thread_pool::create_private_disp( so_environment(), 16 ) ) { so_subscribe_self() .event( &analyzer_manager::on_new_check_request ) // А в этом случае метод-обработчик не имеет параметров, // поэтому тип сигнала-инцидента указывается явно. .event< try_create_next_analyzer >( &analyzer_manager::on_create_new_analyzer ) .event< analyzer_finished >( &analyzer_manager::on_analyzer_finished ); } private : const size_t max_parallel_analyzers_{ 16 }; size_t active_analyzers_{ 0 }; disp::thread_pool::private_dispatcher_handle_t analyzers_disp_; list< check_request > pending_requests_; void on_new_check_request( const check_request & msg ) { // Работаем по очень простой схеме: сперва сохраняем очередной // запрос в список ожидания, затем отсылаем себе сигнал для // попытки запустить очередного обработчика. // И создавать агента-анализатора будем уже при обработке сигнала. pending_requests_.push_back( msg ); // Отсыла��м сигнал сами себе. send< try_create_next_analyzer >( *this ); } void on_create_new_analyzer() { // Запустить новый анализатор можно только если еще не достигнут // лимит на их количество. if( active_analyzers_ >= max_parallel_analyzers_ ) return; lauch_new_analyzer(); // Если список не пуст и возможность стартовать анализаторов // сохраняется, то продолжим это делать. if( !pending_requests_.empty() && active_analyzers_ < max_parallel_analyzers_ ) send< try_create_next_analyzer >( *this ); } void on_analyzer_finished() { // Фиксируем факт, что анализаторов стало меньше. --active_analyzers_; // Если есть, что запускать на обработку, делаем это. if( !pending_requests_.empty() ) lauch_new_analyzer(); } void lauch_new_analyzer() { introduce_child_coop( *this, analyzers_disp_->binder( disp::thread_pool::bind_params_t() ), [this]( coop_t & coop ) { coop.make_agent< email_analyzer >( pending_requests_.front().email_file_, pending_requests_.front().reply_to_ ); // Нам нужно автоматически получить уведомление, когда эта кооперация // перестанет работать. Для чего мы назначаем специальный нотификатор, // задачей которого будет отсылка сигнала analyzer_finished. coop.add_dereg_notificator( // Нотификатор получает ряд параметров, но нам они сейчас не нужны. [this]( environment_t &, const string &, const coop_dereg_reason_t & ) { send< analyzer_finished >( *this ); } ); } ); // Фиксируем тот факт, что анализаторов стало больше. ++active_analyzers_; // Соответствующую заявку в списке ожидания больше хранить не нужно. pending_requests_.pop_front(); } };
В принципе, мы получили более-менее нормальное решение, которое можно было бы счесть удовлетворительным. Если бы не одно «но».
Это «но» состоит в том, что хотя у нас и есть возможность запускать в параллельную работу несколько агентов-анализаторов, распараллеливание получится так себе. Если, скажем, одновременно стартуют пять агентов, то все пятеро сразу же начнут I/O операции и пока эти операции будут выполняться, никто не сможет делать ничего другого. Потом I/O операции закончатся и все пятеро агентов начнут разбор прочитанных с диска данных. Тем самым займут процессор. Этим можно было бы воспользоваться для того, чтобы начать I/O операции для следующих нескольких агентов-анализаторов. Но мы не можем этого сделать, пока первые пять агентов заняты своей работой.
Решить эту проблему можно, если изъять из email_analyzer I/O операцию. Вместо того, чтобы загружать данные из файла самостоятельно, агент email_analyzer может делегировать эту задачу специальному IO-агенту. Т.е. агент email_analyzer стартует, отсылает сообщение IO-агенту и затем получает результат I/O операции в виде ответного сообщения. Тем самым предоставляя возможность другому email_analyzer-у выполнить свою часть работы (отослать сообщение IO-агенту или обработать ответное сообщение от IO-агента). Но разговор о том, как это будет выглядеть и насколько хорошим окажется такое решение мы продолжим в следующей статье.
Пока же можно показать одну важную возможность, которую мы получили в своей текущей реализации агента-менеджера с его списком ожидания: мы можем легко контролировать время ожидания запросов в этом списке.
Действительно, у операции проверки письма наверняка будут какие-то разумные пределы времени ожидания ответа. И если за это время оценить безопасность не удалось, то, скорее всего, и не нужно будет пытаться это делать. Исходя из этого мы можем легко модифицировать агента-менеджера так, чтобы он выбрасывал из списка ожидания те запросы, которые провели в ожидании слишком много времени (например, больше 10 секунд). Для этого задействуем периодическое сообщение, которое будет приходить к менеджеру два раза в секунду. Получив это сообщение менеджер пробежится по списку ожидания и выбросит те запросы, которые ждали больше 10 секунд. Подход, конечно, не очень точный, но зато очень простой и надежный:
class analyzer_manager final : public agent_t { struct try_create_next_analyzer : public signal_t {}; struct analyzer_finished : public signal_t {}; // Потребуется еще один сигнал для таймера проверки времени жизни // заявки в списке ожидания. struct check_lifetime : public signal_t {}; // Кроме того, нам потребуется другая структура для хранения заявки // в списке ожидания. Кроме самой заявки нужно будет хранить еще // и время поступления в список ожидания. using clock = chrono::steady_clock; struct pending_request { clock::time_point stored_at_; check_request request_; }; public : analyzer_manager( context_t ctx ) : agent_t( ctx ) , analyzers_disp_( disp::thread_pool::create_private_disp( so_environment(), 16 ) ) { so_subscribe_self() .event( &analyzer_manager::on_new_check_request ) .event< try_create_next_analyzer >( &analyzer_manager::on_create_new_analyzer ) .event< analyzer_finished >( &analyzer_manager::on_analyzer_finished ) // Для обработки таймера нам нужен еще одно событие-обработчик. .event< check_lifetime >( &analyzer_manager::on_check_lifetime ); } // Используем стартовый метод для того, чтобы запустить периодический таймер. virtual void so_evt_start() override { // Для периодических таймеров нужно сохранять возвращаемый timer_id, // иначе таймер будет автоматически отменен. check_lifetime_timer_ = send_periodic< check_lifetime >( *this, 500ms, 500ms ); } private : const size_t max_parallel_analyzers_{ 16 }; size_t active_analyzers_{ 0 }; disp::thread_pool::private_dispatcher_handle_t analyzers_disp_; // Ограничение на время пребывания заявки в списке ожидания. const chrono::seconds max_lifetime_{ 10 }; // Идентификатор таймера для периодического сигнала check_lifetime. timer_id_t check_lifetime_timer_; list< pending_request > pending_requests_; void on_new_check_request( const check_request & msg ) { // Теперь при сохранении фиксируем время. pending_requests_.push_back( pending_request{ clock::now(), msg } ); send< try_create_next_analyzer >( *this ); } void on_create_new_analyzer() { if( active_analyzers_ >= max_parallel_analyzers_ ) return; lauch_new_analyzer(); if( !pending_requests_.empty() && active_analyzers_ < max_parallel_analyzers_ ) send< try_create_next_analyzer >( *this ); } void on_analyzer_finished() { --active_analyzers_; if( !pending_requests_.empty() ) lauch_new_analyzer(); } void on_check_lifetime() { // Продолжать просмотр списка можно пока в нем есть элементы, которые // подлежат изъятию. while( !pending_requests_.empty() && pending_requests_.front().stored_at_ + max_lifetime_ < clock::now() ) { // Отсылаем неудачный результат проверки email-а самостоятельно. send< check_result >( pending_requests_.front().request_.reply_to_, pending_requests_.front().request_.email_file_, check_status::check_timedout ); pending_requests_.pop_front(); } } void lauch_new_analyzer() { introduce_child_coop( *this, analyzers_disp_->binder( disp::thread_pool::bind_params_t() ), [this]( coop_t & coop ) { coop.make_agent< email_analyzer >( pending_requests_.front().request_.email_file_, pending_requests_.front().request_.reply_to_ ); coop.add_dereg_notificator( [this]( environment_t &, const string &, const coop_dereg_reason_t & ) { send< analyzer_finished >( *this ); } ); } ); ++active_analyzers_; pending_requests_.pop_front(); } };
Пожалуй, на этом пока можно прерваться дабы сохранить разумный объем статьи. В последующих статьях мы продолжим рассматривать этот пример и опишем более сложные реализации агентов, продемонстрировав некоторые специфические особенности SObjectizer-а.
Пока же можно отметить, что в показанных примерах мы уже вплотную столкнулись с одной из самых важных проблем, с которыми сталкивается разработчик, использующий Actorl Model: защита от перегрузок.
Эта проблема возникает, например, когда в системе оказывается слишком много агентов, для того, чтобы их события можно было нормально диспетчировать. Так, если мы позволяем создавать агентов email_analyzer без ограничения их количества, то в один прекрасный момент мы может оказаться в ситуации, когда несколько тысяч таких агентов ждут своей очереди на обработку события и ждут очень долго (счет может идти на минуты и даже десятки минут в самых патологических случаях). В данной статье мы показали один из самых действенных способов решения этого проявления проблемы перегрузок: ограничение на количество агентов и создание новых агентов только по мере появления подходящих для этого возможностей (по мере уничтожения старых агентов).
У проблемы перегрузки есть и другие проявления. Например, возникновение такого количества сообщений, которое приложение не успеет обработать за разумное время. Это так же очень неприятная проблема и SObjectizer предоставляет некоторые инструменты для борьбы с ней. Но более подробно этот вопрос мы затронем в одной из следующих статей.
Кроме проблемы перегрузок есть и еще одна проблема, присущая построенным на акторах/агентах системам: сложность обозримости происходящего в приложении. Это когда в приложении есть 100500 агентов, каждый из которых, вроде бы, работает правильно, но вот понять, работает ли все приложение должным образом, оказывается непросто. Этот вопрос мы так же затронем, но в последующих статьях.
Пока же мы надеемся на то, что приведенные в данной статье примеры и доводы оказались понятными. Ну а если что-то осталось непонятным, то с удовольствием ответим на вопросы в комментариях.
Исходные коды к показанным в статье примерам можно найти в этом репозитории.
