В очередной статье про SObjectizer продолжим следить за эволюцией простого поначалу агента, который все более и более усложняется по мере своего развития. Рассмотрим, как быть с отложенными сообщениями, в которых мы больше не заинтересованы. И воспользуемся некоторой функциональностью иерархических конечных автоматов.
В предыдущей статье мы остановились на том, что у нас появился агент email_analyzer, который можно считать более-менее надежно решающим свою задачу. Однако, он сам, последовательно, выполняет три стадии проверки email-а: сперва проверяет заголовки, затем содержимое, затем аттачи.
Скорее всего, каждая из этих операций не будет исключительно CPU-bound. Намного вероятнее, что вычленив какие-то значения из проверяемого фрагмента (например, из заголовков письма), потребуется сделать куда-то запрос для проверки допустимости этого значения. Например, запрос в БД дабы проверить, нет ли имени хоста-отправителя в черном списке. Пока будет выполняться данный запрос можно было бы выполнить еще какую-то операцию, например, разобрать содержимое текста письма на отдельные ключевые фразы, дабы их можно было проверить по какому-то словарю спам-маркеров. Или проверить, есть ли в аттачах архивы, и инициировать их проверку антивирусом. В общем, имеет смысл распараллелить операции анализа email-а.
Давайте попробуем задействовать отдельных агентов на каждую операцию. Т.е. можно написать агентов вида:
class email_headers_checker : public agent_t { public : struct result { check_status status_ }; /* Сообщение с результатом */ email_headers_checker( context_t ctx, ... /* Какие-то параметры */ ) {...} virtual void so_evt_start() override { ... /* Иницирование операций по проверке заголовков */ } ... /* Какие-то детали реализации */ }; class email_body_checker : public agent_t {...}; class email_attachment_checker : public agent_t {...};
Каждый такой агент будет выполнять специфические для своей операции действия, а затем отошлет результат email_analyzer в виде сообщения. Нашему email_analyzer потребуется создать экземпляры этих агентов у себя и дождаться от них сообщений с результатами анализа:
void on_load_succeed( const load_email_succeed & msg ) { try { auto parsed_data = parse_email( msg.content_ ); introduce_child_coop( *this, // Агенты-checker-ы будут работать на своем собственном // thread-pool-диспетчере, который был создан заранее // под специальным именем. disp::thread_pool::create_disp_binder( "checkers", disp::thread_pool::bind_params_t{} ), [&]( coop_t & coop ) { coop.make_agent< email_headers_checker >( so_direct_mbox(), parsed_data->headers() ); coop.make_agent< email_body_checker >( so_direct_mbox(), parsed_data->body() ); coop.make_agent< email_attach_checker >( so_direct_mbox(), parsed_data->attachments() ); } ); } catch( const exception & ) {...} }
Тех, кто внимательно читал предыдущие статьи, фраза «дождаться от них сообщений» должна была бы насторожить. Ждать без ограничения времени не есть хорошо, это прямой путь получить зря болтающегося в системе и ничего не делающего агента. Поэтому при ожидании ответов от checker-ов нам имеет смысл поступить так же, как и при ожидании результата IO-операции: отослать самим себе какой-то отложенный сигнал, получив который мы поймем, что дальше ждать бессмысленно. Т.е. нам надо было бы написать что-то вроде:
// Попытка представить агента email_analyzer с двумя отложенными сигналами. class email_analyzer : public agent_t { // Этот сигнал потребуется для того, чтобы отслеживать отсутствие // ответа от IO-агента в течении разумного времени. struct io_agent_response_timeout : public signal_t {}; // Этот сигнал потребуется для того, чтобы отслеживать отсутствие // результатов проверки отдельных частей email-а. struct checkers_responses_timeout : public signal_t {}; ... virtual void so_evt_start() override { ... /* Отсылка запроса IO-агенту */ // И сразу же начинаем отсчет тайм-аута для ответа от IO-агента. send_delayed< io_agent_response_timeout >( *this, 1500ms ); } ... void on_load_succeed( const load_succeed & msg ) { ... /* Создание коопераций с агентами checker-ами */ // Сразу же начинаем отсчет тайм-аута для ответов от агентов-checker-ов. send_delayed< checkers_responses_timeout >( *this, 750ms ); } ... void on_checkers_responses_timeout() { ... /* Отсылка отрицательного ответа. */ } };
Однако, пойдя по этому пути мы наступим на грабли: ожидая ответа от checker-ов мы запросто можем получить отложенный сигнал io_agent_response_timeout. Ведь его же никто не отменял. И когда это сигнал придет, мы сгенерируем отрицательный ответ из-за якобы имеющегося тайм-аута ввода-вывода, которого-то и нет. Давайте попробуем обойти эти грабли.
Зачастую разработчики, не привыкшие к асинхронному обмену сообщениями, пытаются отменить отложенный сигнал. Это можно сделать, если сохранить идентификатор таймера при обращении к send_periodic:
// Попытка представить агент email_analyzer с отменой отложенного // сигнала io_agent_response_timeout. class email_analyzer : public agent_t { struct io_agent_response_timeout : public signal_t {}; ... virtual void so_evt_start() override { ... /* Отсылка запроса IO-агенту */ // Для того, чтобы получить идентификатор таймера используем // send_periodic вместо send_delayed, но параметр period // выставляем в 0, что делает отсылаемый сигнал отложен��ым, // но не периодическим. io_response_timer_ = send_periodic< io_agent_response_timeout >( *this, 1500ms, 0ms ); } ... void on_load_succeed( const load_succeed & msg ) { // Отменяем отложенный сигнал. io_response_timer_.reset(); ... /* Создание коопераций с агентами checker-ами */ // Сразу же начинаем отсчет тайм-аута для ответов от агентов-checker-ов. send_delayed< checkers_responses_timeout >( *this, 750ms ); } ... // Идентификатор таймера для отложенного сигнала о тайм-ауте для IO-операции. timer_id_t io_response_timer_; };
К сожалению, этот простой способ не всегда работает. Проблема в том, что отложенный сигнал может быть отослан агенту email_analyzer буквально за мгновение до того, как агент email_analyzer выполнит сброс таймера для этого отложенного сигнала. Тут уж ничего не поделать – чудеса многопоточности, они такие.
Агент email_analyzer может зайти в on_load_succeed на контексте своей рабочей нити, может даже успеть войти в вызов reset() для таймера… Но тут его нить вытеснят, управление получит нить таймера SObjectizer-а, на которой произойдет отсылка отложенного сигнала. После чего управление опять получит рабочая нить агента email_analyzer() и метод reset() для таймера сделает отмену уже отосланного сигнала. Однако, сигнал уже находится в очереди сообщений агента, откуда его уже никто не выбросит – раз уж сообщение попало в очередь к агенту, то изъять его оттуда нельзя.
Самое плохое в этой ситуации то, что подобная ошибка будет возникать эпизодически. Из-за чего понять, что именно происходит и в чем именно ошибка, будет сложно. Так что нужно помнить, что отмена отложенного сообщения – это вовсе не гарантия того, что оно не будет отослано.
Итак, если просто отменять отложенное сообщение неправильно, то что же делать?
Например, можно использовать состояния агента. Когда email_analyzer ждет ответа от IO-агента, он находится в одном состоянии. Когда ответ от IO-агента приходит, агент email_analyzer переходит в другое состояние, в котором он будет ждать ответов от checker-ов. Т.к. во втором состоянии email_analyzer на сигнал io_agent_response_timeout не подписан, то этот сигнал будет просто проигнорирован.
С введением состояний в агент email_analyzer мы могли бы получить что-то вроде:
// Попытка представить агент email_analyzer с использованием // нескольких состояний. class email_analyzer : public agent_t { struct io_agent_response_timeout : public signal_t {}; struct checkers_responses_timeout : public signal_t {}; // Состояние, в котором агент будет ждать результата IO-операции. state_t st_wait_io{ this }; // Состояние, в котором агент будет ждать ответов от checker-ов. state_t st_wait_checkers{ this }; ... virtual void so_define_agent() override { // Подписываем агента на разные события в разных состояниях. // Для того, чтобы это было наглядно, используем вторую способ // подписки агентов – через методы класса state_t. st_wait_io .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ) .event< io_agent_response_timeout >( &email_analyzer::on_io_timeout ); st_wait_checkers .event( &email_analyzer::on_header_check_result ) .event( &email_analyzer::on_body_check_result ) .event( &email_analyzer::on_attach_check_result ) .event< checkers_responses_timeout >( &email_analyzer::on_checkers_timeout ); } ... };
Однако, в SObjectizer можно поступить еще проще: можно назначить временной лимит на пребывание агента в конкретном состоянии. Когда этот лимит истечет, агент будет принудительно переведен в другое состояние. Т.е. мы можем написать что-то вроде:
// Попытка представить агента email_analyzer с использованием ограничения времени // на пребывание агента в конкретном состоянии. class email_analyzer : public agent_t { state_t st_wait_io{ this }; state_t st_io_timeout{ this }; state_t st_wait_checkers{ this }; state_t st_checkers_timeout{ this }; ... virtual void so_define_agent() override { st_wait_io .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ) // Ограничиваем время ожидания. .time_limit( 1500ms, st_io_timeout ); st_wait_checkers .event( &email_analyzer::on_header_check_result ) .event( &email_analyzer::on_body_check_result ) .event( &email_analyzer::on_attach_check_result ) .time_limit( 750ms, st_checkers_timeout ); } };
Но просто ограничить время пребывания в некотором состоянии недостаточно. Нужно еще предпринять какие-то действия, когда это время истечет. Как это сделать?
Использовать такую вещь, как обработчик входа в состояние. Когда агент входит в конкретное состояние, SObjectizer вызывает функцию-обработчик входа в это состояние, если пользователь такую функцию назначил. Это означает, что на вход в st_io_timeout мы можем повесить обработчик, который отсылает check_result с отрицательным результатом и завершает работу агента:
st_io_timeout.on_enter( [this]{ send< check_result >( reply_to_, email_file_, check_status::check_failure ); so_deregister_agent_coop_normally(); } );
Точно такой же обработчик мы повесим и на вход в st_checkers_timeout. А т.к. действия внутри этих обработчиков будут одинаковыми, то мы можем вынести их в отдельный метод агента email_analyzer и указать этот метод в качестве обработчика входа и для состояния st_io_timeout, и для состояния st_checkers_timeout:
class email_analyzer : public agent_t { state_t st_wait_io{ this }; state_t st_io_timeout{ this }; state_t st_wait_checkers{ this }; state_t st_checkers_timeout{ this }; ... virtual void so_define_agent() override { ... st_io_timeout .on_enter( &email_analyzer::on_enter_timeout_state ); ... st_checkers_timeout .on_enter( &email_analyzer::on_enter_timeout_state ); }; ... void on_enter_timeout_state() { send< check_result >( reply_to_, email_file_, check_status::check_failure ); so_deregister_agent_coop_normally(); } };
Но и это еще не все. Раз уж мы затронули тему состояний агентов и их возможностей, то можно развить ее дальше и провести рефакторинг кода email_analyzer.
Нетрудно заметить, что в коде очень часто дублируется парочка действий: отсылка сообщения check_result и дерегистрация кооперации агента. Такое дублирование не есть хорошо, cледует от него избавиться.
По сути, работа агента email_analyzer сводится к тому, чтобы в итоге агент оказался в одном из двух состояний: либо все завершилось нормально и следует отослать положительный результат, после чего завершить свою работу, либо же все завершилось ошибкой, нужно отослать отрицательный результат и, опять таки, завершить работу агента. Так давайте это и выразим прямо в коде с помощью двух состояний агента: st_success и st_failure.
// Попытка представить агента email_analyzer со специальными финальными // состояниями st_success и st_failure. class email_analyzer : public agent_t { state_t st_wait_io{ this }; state_t st_wait_checkers{ this }; state_t st_failure{ this }; state_t st_success{ this }; ... virtual void so_define_agent() override { st_wait_io .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ) // Ограничиваем время ожидания. .time_limit( 1500ms, st_failure ); st_wait_checkers .event( &email_analyzer::on_header_check_result ) .event( &email_analyzer::on_body_check_result ) .event( &email_analyzer::on_attach_check_result ) .time_limit( 750ms, st_failure ); st_failure .on_enter( [this]{ send< check_result >( reply_to_, email_file_, status_ ); so_deregister_agent_coop_normally(); } ); st_success .on_enter( [this]{ send< check_result >( reply_to_, email_file_, check_status::safe ); so_deregister_agent_coop_normally(); } ); }; ... // Новый атрибут нужен для сохранения актуального отрицательного результата. check_status status_{ check_status::check_failure }; };
Это позволит нам в коде агента просто менять состояние для завершения работы агента тем или иным образом:
void on_load_failed( const load_email_failed & ) { st_failure.activate(); } void on_checker_result( check_status status ) { // На первом же неудачном результате прерываем свою работу. if( check_status::safe != status ) { status_ = status; st_failure.activate(); } else { ++checks_passed_; if( 3 == checks_passed_ ) // Все результаты получены. Можно завершать проверку с // положительным результатом. st_success.activate(); } }
Но можно пойти и еще дальше. Для состояний st_failure и st_success есть одно общее действие, которое нужно выполнить при входе в любое их этих состояний – обращение к so_deregister_agent_coop_normally(). И это не случайно, ведь оба этих состояния отвечают за завершение работы агента. А раз так, то мы можем воспользоваться вложенными состояниями. Т.е. мы введем состояние st_finishing, для которого st_failure и st_success будут подсостояниями. При входе в st_finishing будет вызываться so_deregister_agent_coop_normally(). А при входе в st_failure и st_success – будет только отсылаться соответствующее сообщение.
Т.к. состояния st_failure и st_success вложены в st_finishing, то при входе в любое из них сначала будет вызваться обработчик входа в st_finishing, а уже затем – обработчик входа в st_failure или st_success. Получится, что мы при входе в st_finishing мы дерегистрируем агента, а следом, при входе в st_failure или st_success, отсылаем сообщение check_result.
Если кто-то из читателей чувствует себя не комфортно при упоминании вложенных состояний, обработчиков входа в состояния, ограничений на время пребывания в состоянии, то имеет смысл ознакомится с одной из основополагающих статей на тему иерархических конечных автоматов: David Harel, Statecharts: A visual formalism for complex systems. Science of Computer Programming. Состояния агентов в SObjectizer реализуют изрядную часть описанных там возможностей.
В итоге всех этих преобразований агент email_analyzer примет показанный ниже вид.
// Седьмая версия агента email_analyzer, с распараллеливанием работы по проверке // содержимого email-а и использованием вложенных состояний. class email_analyzer : public agent_t { state_t st_wait_io{ this }; state_t st_wait_checkers{ this }; state_t st_finishing{ this }; state_t st_failure{ initial_substate_of{ st_finishing } }; state_t st_success{ substate_of{ st_finishing } }; public : email_analyzer( context_t ctx, string email_file, mbox_t reply_to ) : agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to)) {} virtual void so_define_agent() override { st_wait_io .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ) // Назначаем тайм-аут для ожидания ответа. .time_limit( 1500ms, st_failure ); st_wait_checkers .event( [this]( const email_headers_checker::result & msg ) { on_checker_result( msg.status_ ); } ) .event( [this]( const email_body_checker::result & msg ) { on_checker_result( msg.status_ ); } ) .event( [this]( const email_attach_checker::result & msg ) { on_checker_result( msg.status_ ); } ) // Еще один тайм-аут для ответов. .time_limit( 750ms, st_failure ); // Для состояний, которые отвечают за завершение работы, // нужно определить только обработчики входа. st_finishing.on_enter( [this]{ so_deregister_agent_coop_normally(); } ); st_failure.on_enter( [this]{ send< check_result >( reply_to_, email_file_, status_ ); } ); st_success.on_enter( [this]{ send< check_result >( reply_to_, email_file_, check_status::safe ); } ); } virtual void so_evt_start() override { // Начинаем работать в состоянии по умолчанию, поэтому // нужно принудительно перейти в нужное состояние. st_wait_io.activate(); // При старте сразу же отправляем запрос IO-агенту для загрузки // содержимого email файла. send< load_email_request >( so_environment().create_mbox( "io_agent" ), email_file_, so_direct_mbox() ); } private : const string email_file_; const mbox_t reply_to_; // Храним последний отрицательный результат для того, чтобы отослать // его при входе в состояние st_failure. check_status status_{ check_status::check_failure }; int checks_passed_{}; void on_load_succeed( const load_email_succeed & msg ) { // Меняем состояние т.к. переходим к следующей операции. st_wait_checkers.activate(); try { auto parsed_data = parse_email( msg.content_ ); introduce_child_coop( *this, // Агенты-checker-ы будут работать на своем собственном // thread-pool-диспетчере, который был создан заранее // под специальным именем. disp::thread_pool::create_disp_binder( "checkers", disp::thread_pool::bind_params_t{} ), [&]( coop_t & coop ) { coop.make_agent< email_headers_checker >( so_direct_mbox(), parsed_data->headers() ); coop.make_agent< email_body_checker >( so_direct_mbox(), parsed_data->body() ); coop.make_agent< email_attach_checker >( so_direct_mbox(), parsed_data->attachments() ); } ); } catch( const exception & ) { st_failure.activate(); } } void on_load_failed( const load_email_failed & ) { st_failure.activate(); } void on_checker_result( check_status status ) { // На первом же неудачном результате прерываем свою работу. if( check_status::safe != status ) { status_ = status; st_failure.activate(); } else { ++checks_passed_; if( 3 == checks_passed_ ) // Все результаты получены. Можно завершать проверку с // положительным результатом. st_success.activate(); } } };
Ну а теперь имеет смысл посмотреть на код получившегося агента email_analyzer и задать себе простой, но важный вопрос: а оно того стоило?
Очевидно, что с ответом на этот вопрос все не так однозначно. Но поговорить об этом мы попробуем уже в следующей статье. В которой затронем тему уроков, которые мы извлекли после более чем десяти лет использования SObjectizer в разработке программных систем.
Исходные коды к показанным в статье примерам можно найти в этом репозитории.
