Новое в SObjectizer-5.5.23: исполнение желаний или ящик Пандоры?



    Данная статья является продолжением опубликованной месяц назад статьи-размышлении "Легко ли добавлять новые фичи в старый фреймворк? Муки выбора на примере развития SObjectizer-а". В той статье описывалась задача, которую мы хотели решить в очередной версии SObjectizer-а, рассматривались два подхода к ее решению и перечислялись достоинства и недостатки каждого из подходов.

    Прошло время, один из подходов был воплощен в жизнь и новые версии SObjectizer-а, а также сопутствующего ему проекта so_5_extra, уже, что называется «задышали полной грудью». Можно в буквальном смысле брать и пробовать.

    Сегодня же мы поговорим о том, что было сделано, зачем это было сделано, к чему это привело. Если кому-то интересно следить за тем, как развивается один из немногих живых, кросс-платформенных и открытых акторных фреймворков для C++, милости прошу под кат.

    С чего все началось?


    Начиналось все с попытки решить проблему гарантированной отмены таймеров. Суть проблемы в том, что когда отсылается отложенное или периодическое сообщение, то программист может отменить доставку сообщения. Например:

    auto timer_id = so_5::send_periodic<my_message>(my_agent, 10s, 10s, ...);
    ... // Что-то делаем.
    // Понимаем, что периодическое сообщение my_message больше нам не нужно.
    timer_id.release(); // Теперь таймер не будет отсылать my_message.

    После вызова timer_id.release() таймер больше не будет отсылать новые экземпляры сообщения my_message. Но те экземпляры, которые уже были отосланы и попали в очереди получателей, никуда не денутся. Со временем они будут извлечены из этих самых очередей и будут переданы агентам-получателям для обработки.

    Проблема эта является следствием базовых принципов работы SObjectizer-5 и не имеет простого решения из-за того, что SObjectizer не может изымать сообщения из очередей. Не может потому, что в SObjectizer очереди принадлежат диспетчерам, диспетчеры бывают разные, очереди у них также организованы по разному. В том числе бывают диспетчеры, которые не входят в состав SObjectizer-а и SObjectizer в принципе не может знать, как эти диспетчеры работают.

    В общем, есть вот такая особенность у родных таймеров SObjectizer-а. Не то, чтобы она слишком уж портила жизнь разработчикам. Но некоторую дополнительную внимательность нужно проявлять. Особенно новичкам, которые только знакомятся с фреймворком.

    И вот, наконец, руки дошли до того, чтобы предложить решение для этой проблемы.

    Какой путь решения был выбран?


    В предыдущей статье рассматривалось два возможных варианта. Первый вариант не требовал модификаций механизма доставки сообщений в SObjectizer-е, но зато требовал от программиста явным образом изменять тип отсылаемого/получаемого сообщения.

    Второй вариант требовал модификации механизма доставки сообщений SObjectizer-а. Именно этот путь и был выбран, поскольку он позволял прятать от получателя сообщения тот факт, что сообщение было отослано каким-то специфическим образом.

    Что изменилось в SObjectizer?


    Новое понятие: конверт с сообщением внутри


    Первая составляющая реализованного решения — это добавление в SObjectizer такого понятия, как конверт (envelope). Конверт — это специальное сообщение, внутри которого лежит актуальное сообщение (payload). SObjectizer доставляет конверт с сообщением до получателя почти что обычным способом. Принципиальная разница в обработке конверта обнаруживается лишь на самом последнем этапе доставки:

    • при доставке обычного сообщения у агента-получателя просто ищется обработчик для данного типа сообщения и, если такой обработчик найден, то найденный обработчик вызывается и ему отдается доставленное сообщение в качестве параметра;
    • а при доставке конверта с сообщением после того, как обработчик будет найден, сперва делается попытка достать сообщение из конверта. И только если конверт отдал хранящееся в нем сообщение, только тогда вызывается обработчик.

    Здесь есть два ключевых момента, которые оказывают серьезнейшее влияние на то, для чего и как могут использоваться конверты с сообщениями.

    Первый ключевой момент в том, что у конверта сообщение запрашивается только тогда, когда у получателя найден обработчик для сообщения. Т.е. только тогда, когда сообщение действительно доставлено до получателя и получатель вот прямо здесь и сейчас будет это сообщение обрабатывать.

    Второй ключевой момент здесь в том, что конверт может не отдать находящееся в нем сообщение. Т.е., например, конверт может проверить текущее время и решить, что все сроки доставки были пропущены и, поэтому, сообщение перестало быть актуальным и обрабатывать его нельзя. Посему конверт не отдаст сообщение наружу. Соответственно, SObjectizer просто проигнорирует этот конверт и никаких дополнительных действий предпринимать не будет.

    Что из себя представляет конверт?


    Конверт — это реализация интерфейса envelope_t, который определен следующим образом:

    class SO_5_TYPE envelope_t : public message_t
       {
       public:
          ... // Конструкторы-деструкторы.
    
          // Хук для случая, когда сообщение доставлено до получателя и
          // получатель готов обработать его.
          virtual void handler_found_hook(
             handler_invoker_t & invoker ) noexcept = 0;
    
          // Хук для случая, когда сообщение должно быть трансформированно
          // из одно представления в другое.
          virtual void transformation_hook(
             handler_invoker_t & invoker ) noexcept = 0;
    
       private :
          kind_t so5_message_kind() const noexcept override
             { return kind_t::enveloped_msg; }
       };

    Т.е. конверт — это, по сути такое же сообщение, как и все остальные. Но со специальным признаком, который и возвращается методом so5_message_kind().

    Программист может разрабатывать свои конверты наследуясь от envelope_t (или, что более удобно, от so_5::extra::enveloped_msg::just_envelope_t) и переопределяя методы-хуки handler_found_hook() и transformation_hook().

    Внутри методов-хуков разработчик конверта решает, хочет ли он отдать находящееся внутри конверта сообщение для обработки/трансформации или не хочет. Если хочет, то разработчик должен вызвать метод invoke() и объекта invoker. Если не хочет, то не вызывает, в этом случае конверт и его содержимое будет проигнорированно.

    Как с помощью конвертов решается проблема с отменой таймеров?


    Решение, которое сейчас реализовано в so_5_extra в виде пространства имен so_5::extra::revocable_timer, очень простое: при особой отсылке отложенного или периодического сообщения создается специальный конверт, внутри которого находится не только само сообщение, но и атомарный флаг revoked. Если этот флаг сброшен, то сообщение считается актуальным. Если выставлен, то сообщение считается отозванным.

    Когда у конверта вызывается метод-хук, то конверт проверяет значение флага revoked. Если флаг выставлен, то конверт не отдает сообщение наружу. Тем самым, обработка сообщения не выполняется даже если таймер уже успел поместить сообщение в очередь получателя.

    Расширение интерфейса abstract_message_box_t


    Добавление интерфейса envelope_t — это только одна часть реализации конвертов в SObjectizer. Вторая часть — это учет факта существования конвертов в механизме доставки сообщений внутри SObjectizer-а.

    Тут, к сожалению, не обошлось без внесения видимых для пользователя изменений. В частности, в класс abstract_message_box_t, который определяет интерфейс всех почтовых ящиков в SObjectizer-е, потребовалось добавить еще один виртуальный метод:

    virtual void do_deliver_enveloped_msg(
       const std::type_index & msg_type,
       const message_ref_t & message,
       unsigned int overlimit_reaction_deep );

    Этот метод отвечает за доставку до получателя конверта message с сообщением типа msg_type внутри. Такая доставка может отличаться в деталях реализации в зависимости от того, что это за mbox.

    При добавлении do_deliver_enveloped_msg() в abstract_message_box_t у нас был выбор: сделать его чистым виртуальным методом или же предложить какую-то реализацию по умолчанию.

    Если бы мы сделали do_deliver_enveloped_msg() чистым виртуальным методом, то мы бы поломали совместимость между версиями SObjectizer в ветке 5.5. Ведь тогда тем пользователям, которые написали собственные реализации mbox-ов, пришлось бы при переходе на SObjectizer-5.5.23 модифицировать собственные mbox-ы, иначе бы не удалось пройти компиляцию с новой версией SObjectizer-а.

    Нам этого не хотелось, поэтому мы не стали делать do_deliver_enveloped_msg() чистым виртуальным методом в v.5.5.23. Он имеет реализацию по умолчанию, которая просто бросает исключение. Т.о., кастомные пользовательские mbox-ы смогут нормально продолжать работу с обычными сообщениями, но будут автоматически отказываться принимать конверты. Мы сочли такое поведение более приемлемым. Тем более, что на начальном этапе вряд ли конверты с сообщениями будут применяться широко, да и маловероятно что в «дикой природе» часто встречаются кастомные реализации SObjectizer-овских mbox-ов ;)

    Кроме того, существует далеко не нулевая вероятность, что в последующих мажорных версиях SObjectizer-а, где мы не будем оглядываться на совместимость с веткой 5.5, интерфейс abstract_message_box_t претерпит серьезные изменения. Но это мы уже забегаем далеко вперед…

    Как отсылать конверты с сообщениями


    Сам SObjectizer-5.5.23 не предоставляет простых средств отсылки конвертов. Предполагается, что под конкретную задачу разрабатывается конкретный тип конверта и соответствующие инструменты для удобной отсылки конвертов конкретного типа. Пример этого можно увидеть в so_5::extra::revocable_timer, где нужно не только отослать конверт, но и отдать пользователю специальный timer_id.

    Для более простых ситуаций можно воспользоваться средствами из so_5::extra::enveloped_msg. Например, вот так выглядит отсылка сообщения с заданным ограничением на время его доставки:

    
    // make создает экземпляр сообщения для доставки.
    so_5::extra::enveloped_msg::make<my_message>(... /* Параметры для конструктора */)
       // envelope помещает созданное только что сообщение в конверт нужного типа.
       // Значение 5s передается в конструктор конверта вместе с экземпляром сообщения.
       .envelope<so_5::extra::enveloped_msg::time_limited_delivery_t>(5s)
       // А вот и отсылка конверта с сообщением адресату.
       .send_to(destination);

    Чтобы было совсем весело: конверты в конвертах


    Конверты предназначены для переноса внутри себя каких-то сообщений. Но каких?

    Любых.

    И это подводит нас к интересному вопросу: а можно ли вложить конверт внутрь другого конверта?

    Да, можно. Сколько угодно. Глубина вложенности ограничена только здравым смыслом разработчика и глубиной стека для рекурсивного вызова handler_found_hook/transformation_hook.

    При этом SObjectizer идет навстречу разработчикам собственных конвертов: конверт не должен думать о том, что у него внутри — конкретное сообщение или другой конверт. Когда у конверта вызывают метод-хук и конверт решает, что он может отдать свое содержимое, то конверт просто вызывает invoke() у handler_invoker_t и передает в invoke() ссылку на свое содержимое. А уже invoke() внутри сам разберется, с чем он имеет дело. И если это еще один конверт, то invoke() сам вызовет у этого конверта нужный метод-хук.

    С помощью уже показанного выше инструментария из so_5::extra::enveloped_msg пользователь может сделать несколько вложенных конвертов вот таким образом:

    so_5::extra::enveloped_msg::make<my_message>(...)
       // Конверт, который будет внутри и который содержит сообщение my_message.
       .envelope<inner_envelope_type>(...)
       // Конверт, который будет содержать конверт типа inner_envelope_type.
       .envelope<outer_envelope_type>(...)
       .send_to(destination);

    Несколько примеров использования конвертов


    Теперь, после того, как мы прошлись по внутренностям SObjectizer-5.5.23 пора бы уже перейти к более полезной для пользователей, прикладной части. Ниже рассматривается несколько примеров, которые либо базируются на том, что уже реализовано в so_5_extra, либо используют инструменты из so_5_extra.

    Отзывные таймеры


    Поскольку вся эта кухня с конвертами затевалась ради решения проблемы гарантированного отзыва таймерых сообщений, то давайте посмотрим, что в итоге получилось. Будем использовать пример из so_5_extra-1.2.0, который задействует инструменты из нового пространства имен so_5::extra::revocable_timer:

    Код примера с отзывными таймерами
    #include <so_5_extra/revocable_timer/pub.hpp>
    
    #include <so_5/all.hpp>
    
    namespace timer_ns = so_5::extra::revocable_timer;
    
    class example_t final : public so_5::agent_t
    {
       // Набор сигналов, которые мы будем использовать для отсылки
       // отложенных и периодического сообщения.
       struct first_delayed final : public so_5::signal_t {};
       struct second_delayed final : public so_5::signal_t {};
       struct last_delayed final : public so_5::signal_t {};
    
       struct periodic final : public so_5::signal_t {};
    
       // Идентификаторы для таймерных сообщений.
       timer_ns::timer_id_t m_first;
       timer_ns::timer_id_t m_second;
       timer_ns::timer_id_t m_last;
       timer_ns::timer_id_t m_periodic;
    
    public :
       example_t( context_t ctx ) : so_5::agent_t{ std::move(ctx) }
       {
          so_subscribe_self()
             .event( &example_t::on_first_delayed )
             .event( &example_t::on_second_delayed )
             .event( &example_t::on_last_delayed )
             .event( &example_t::on_periodic );
       }
    
       void so_evt_start() override
       {
          using namespace std::chrono_literals;
    
          // Отсылаем три сигнала как отложенные сообщения...
          m_first = timer_ns::send_delayed< first_delayed >( *this, 100ms );
          m_second = timer_ns::send_delayed< second_delayed >( *this, 200ms );
          m_last = timer_ns::send_delayed< last_delayed >( *this, 300ms );
          // ...и один как периодическое сообщение.
          m_periodic = timer_ns::send_periodic< periodic >( *this, 75ms, 75ms );
    
          // Блокируем агента на 220ms. За это время в очередь агента
          // должны попасть сигналы first_delaye, second_delayed и
          // несколько экземпляров сигнала periodic.
          std::cout << "hang the agent..." << std::flush;
          std::this_thread::sleep_for( 220ms );
          std::cout << "done" << std::endl;
       }
    
    private :
       void on_first_delayed( mhood_t<first_delayed> )
       {
          std::cout << "first_delayed received" << std::endl;
    
          // Отменяем доставку second_delayed и periodic.
          // Агент не должен получить эти сигналы не смотря на то, что
          // они уже стоят в очереди сообщений агента.
          m_second.revoke();
          m_periodic.revoke();
       }
    
       void on_second_delayed( mhood_t<second_delayed> )
       {
          std::cout << "second_delayed received" << std::endl;
       }
    
       void on_last_delayed( mhood_t<last_delayed> )
       {
          std::cout << "last_delayed received" << std::endl;
          so_deregister_agent_coop_normally();
       }
    
       void on_periodic( mhood_t<periodic> )
       {
          std::cout << "periodic received" << std::endl;
       }
    };
    
    int main()
    {
       so_5::launch( [](so_5::environment_t & env) {
          env.register_agent_as_coop( "example", env.make_agent<example_t>() );
       } );
    
       return 0;
    }


    Что мы здесь имеем?

    У нас есть агент, который сперва инициирует несколько таймерных сообщений, а потом блокирует свою рабочую нить на некоторое время. За это время таймер успевает поставить в очередь агента несколько заявок в результате сработавших таймеров: несколько экземпляров periodic, по одному экземпляру first_delayed и second_delayed.

    Соответственно, когда агент разблокирует свою нить, он должен получить первый periodic и first_delayed. При обработке first_delayed агент отменяет доставку periodic-а и second_delayed. Поэтому эти сигналы до агента доходить не должны вне зависимости от того, есть ли они уже в очереди агента или нет (а они есть).

    Смотрим на результат работы примера:

    hang the agent...done
    periodic received
    first_delayed received
    last_delayed received

    Да, так и есть. Получили первый periodic и first_delayed. Затем нет ни periodic-а, ни second_delayed.

    А вот если в примере заменить «таймеры» из so_5::extra::revocable_timer на штатные таймеры из SObjectizer, то результат будет другой: до агента все-таки дойдут те экземпляры сигналов periodic и second_delayed, которые уже попали к агенту в очередь.

    Сообщения с ограничениями на время доставки


    Еще одна полезная, временами, штука, которая станет доступной в so_5_extra-1.2.0 — это доставка сообщения с ограничением по времени. Например, агент request_handler отсылает сообщение verify_signature агенту crypto_master. При этом request_handler хочет, чтобы verify_signature был доставлен в течении 5 секунд. Если это не произошло, то смысла в обработке verity_signature уже не будет, агент request_handler уже прекратит свою работу.

    А агент crypto_master — это такой товарищ, который любит оказываться «бутылочным горлышком»: временами начинает притормаживать. В такие момент у него в очереди скапливаются сообщения, вроде вышеуказанного verify_signature, которые могут ждать до тех пор, пока crypto_master-у не полегчает.

    Предположим, что request_handler отослал сообщение verify_signature агенту crypto_master, но тут crypto_master-у поплохело о он «залип» на 10 секунд. Агент request_handler уже «отвалился», т.е. уже отослал всем отказ в обслуживании и завершил свою работу. Но ведь сообщение verify_signature в очереди crypto_master-а осталось! Значит, когда crypto_master «отлипнет», то он возьмет данное сообщение и будет это сообщение обрабатывать. Хотя это уже не нужно.

    С помощью нового конверта so_5::extra::enveloped_msg::time_limited_delivery_t мы можем решить данную проблему: агент request_handler отошлет verify_signature вложенное в конверт time_limited_delivery_t с ограничением на время доставки:

    so_5::extra::enveloped_msg::make<verify_signature>(...)
       .envelope<so_5::extra::enveloped_msg::time_limited_delivery_t>(5s)
       .send_to(crypto_master_mbox);

    Теперь если crypto_master «залипнет» и не успеет добраться до verify_signature за 5 секунд, то конверт просто не отдаст это сообщение на обработку. И crypto_master не будет делать работу, которая уже никому не нужна.

    Отчеты о доставке сообщений до получателя


    Ну и напоследок пример любопытной штуки, которая не реализована штатно ни в SObjectizer, ни в so_5_extra, но которую можно сделать самостоятельно.

    Иногда хочется получать от SObjectizer-а что-то вроде «отчета о доставке» сообщения до получателя. Ведь одно дело, когда сообщение до получателя дошло, но получатель по каким-то своим причинам на него не среагировал. Другое дело, когда сообщение вообще до получателя не дошло. Например, было заблокировано механизмом защиты агентов от перегрузки. В первом случае сообщение, на которое мы не дождались ответа, можно не перепосылать. А вот во втором случае может иметь смысл перепослать сообщение спустя некоторое время.

    Сейчас мы рассмотрим, как посредством конвертов можно реализовать простейший механизм «отчетов о доставке».

    Итак, сначала сделаем необходимые подготовительные действия:

    #include <so_5_extra/enveloped_msg/just_envelope.hpp>
    #include <so_5_extra/enveloped_msg/send_functions.hpp>
    
    #include <so_5/all.hpp>
    
    using namespace std::chrono_literals;
    
    namespace envelope_ns = so_5::extra::enveloped_msg;
    
    using request_id_t = int;
    

    Теперь мы можем определить сообщения, которые будут использоваться в примере. Первое сообщение — это запрос для выполнения каких-то нужных нам действий. А второе сообщение — это подтверждение того, что первое сообщение дошло до получателя:

    struct request_t final
    {
       request_id_t m_id;
       std::string m_data;
    };
    
    struct delivery_receipt_t final
    {
       // Это значение request_t::m_id из соответствующего request_t.
       request_id_t m_id;
    };

    Далее мы можем определить агента processor_t, который будет обрабатывать сообщения типа request_t. Но обрабатывать будет с имитацией «залипания». Т.е. он обрабатывает request_t, после чего меняет свое состояние с st_normal на st_busy. В состоянии st_busy он ничего не делает и игнорирует все сообщения, которые к нему прилетают.

    Это означает, что если агенту processor_t отослать подряд три сообщения request_t, то первое он обработает, а два других будут выброшены, т.к. при обработке первого сообщения агент уйдет в st_busy и проигнорирует то, что к нему будет приходить пока он находится в st_busy.

    В st_busy агент processor_t проведет 2 секунды, после чего вновь вернется в st_normal и будет готов обрабатывать новые сообщения.

    Вот как агент processor_t выглядит:

    class processor_t final : public so_5::agent_t
    {
       // Нормальное состояние агента. В этом состоянии выполняется
       // обработка входящих сообщений.
       state_t st_normal{this, "normal"};
       // Состояние "я занят". Новые сообщения игнорируются.
       state_t st_busy{this, "busy"};
    
    public:
       processor_t(context_t ctx) : so_5::agent_t{std::move(ctx)}
       {
          this >>= st_normal;
    
          st_normal.event(&processor_t::on_request);
    
          // Для этого состояния нет подписок, но есть лимит времени.
          // Через 2 секунды после входа, автоматический возврат в st_normal.
          st_busy.time_limit(2s, st_normal);
       }
    
    private:
       void on_request(mhood_t<request_t> cmd)
       {
          std::cout << "processor: on_request(" << cmd->m_id << ", "
                << cmd->m_data << ")" << std::endl;
    
          this >>= st_busy;
       }
    };

    Теперь мы можем определить агента requests_generator_t, у которого есть пачка запросов, которые нужно доставить до processor_t. Агент request_generator_t раз в 3 секунды отправляет всю пачку, а затем ждет подтверждения о доставке в виде delivery_receipt_t.

    Когда delivery_recept_t приходит, агент requests_generator_t выбрасывает доставленный запрос из пачки. Если пачка совсем опустела, то работа примера завершается. Если же еще что-то осталось, то оставшаяся пачка будет отослана заново когда наступит следующее время перепосылки.

    Итак, вот код агента request_generator_t. Он довольно объемный, но примитивный. Обратить внимание можно разве что на внутренности метода send_requests(), в котором отсылаются сообщения request_t, вложенные в специальный конверт.

    Код агента requests_generator_t
    class requests_generator_t final : public so_5::agent_t
    {
       // Почтовый ящик обработчика запросов.
       const so_5::mbox_t m_processor;
    
       // Пачка запросов, для которых еще нет подтверждения о доставке.
       std::map<request_id_t, std::string> m_requests;
    
       struct resend_requests final : public so_5::signal_t {};
    
    public:
       requests_generator_t(context_t ctx, so_5::mbox_t processor)
          :  so_5::agent_t{std::move(ctx)}
          ,  m_processor{std::move(processor)}
       {
          so_subscribe_self()
             .event(&requests_generator_t::on_delivery_receipt)
             .event(&requests_generator_t::on_resend);
       }
    
       void so_evt_start() override
       {
          // Формируем первоначальную пачку запросов.
          m_requests.emplace(0, "First");
          m_requests.emplace(1, "Second");
          m_requests.emplace(2, "Third");
          m_requests.emplace(3, "Four");
    
          // Начинаем рассылку.
          send_requests();
       }
    
    private:
       void on_delivery_receipt(mhood_t<delivery_receipt_t> cmd)
       {
          std::cout << "request delivered: " << cmd->m_id << std::endl;
          m_requests.erase(cmd->m_id);
    
          if(m_requests.empty())
             // Запросов больше не досталось. Работу прекращаем.
             so_deregister_agent_coop_normally();
       }
    
       void on_resend(mhood_t<resend_requests>)
       {
          std::cout << "time to resend requests, pending requests: "
                << m_requests.size() << std::endl;
    
          send_requests();
       }
    
       void send_requests()
       {
          for(const auto & item : m_requests)
          {
             std::cout << "sending request: (" << item.first << ", "
                   << item.second << ")" << std::endl;
    
             envelope_ns::make<request_t>(item.first, item.second)
                   .envelope<custom_envelope_t>(so_direct_mbox(), item.first)
                   .send_to(m_processor);
          }
    
          // Отложенное сообщение чтобы повторить отсылку через 3 секунды.
          so_5::send_delayed<resend_requests>(*this, 3s);
       }
    };

    Вот теперь у нас есть сообщения и есть агенты, которые с помощью этих сообщений должны общаться. Осталась самая малость — как-то заставить прилетать сообщения delivery_receipt_t при доставке request_t до processor_t.

    Делается это с помощью вот такого конверта:

    class custom_envelope_t final : public envelope_ns::just_envelope_t
    {
       // Куда присылать отчет о доставке.
       const so_5::mbox_t m_to;
    
       // ID доставленного запроса.
       const request_id_t m_id;
    
    public:
       custom_envelope_t(so_5::message_ref_t payload, so_5::mbox_t to, request_id_t id)
          :  envelope_ns::just_envelope_t{std::move(payload)}
          ,  m_to{std::move(to)}
          ,  m_id{id}
       {}
    
       void handler_found_hook(handler_invoker_t & invoker) noexcept override
       {
          // Раз этот хук вызван, значит сообщение до получателя дошло.
          // Можно отсылать отчет о доставке.
          so_5::send<delivery_receipt_t>(m_to, m_id);
          // Всю остальную работу делает базовый класс.
          envelope_ns::just_envelope_t::handler_found_hook(invoker);
       }
    };

    В общем-то, здесь нет ничего сложного. Мы наследуемся от so_5::extra::enveloped_msg::just_envelope_t. Это вспомогательный тип конверта, который хранит вложенное в него сообщение и предоставляет базовую реализацию хуков
    handler_found_hook() и transformation_hook(). Поэтому нам остается только сохранить внутри custom_envelope_t нужные нам атрибуты и отослать delivery_receipt_t внутри хука handler_found_hook().

    Вот, собственно, и все. Если запустить данный пример, то получим следующее:

    sending request: (0, First)
    sending request: (1, Second)
    sending request: (2, Third)
    sending request: (3, Four)
    processor: on_request(0, First)
    request delivered: 0
    time to resend requests, pending requests: 3
    sending request: (1, Second)
    sending request: (2, Third)
    sending request: (3, Four)
    processor: on_request(1, Second)
    request delivered: 1
    time to resend requests, pending requests: 2
    sending request: (2, Third)
    sending request: (3, Four)
    processor: on_request(2, Third)
    request delivered: 2
    time to resend requests, pending requests: 1
    sending request: (3, Four)
    processor: on_request(3, Four)
    request delivered: 3

    В качестве дополнения нужно сказать, что на практике такой простой custom_envelope_t для формирования отчетов о доставке вряд ли подойдет. Но если кому-то интересна эта тема, то ее можно обсудить в комментариях, а не увеличивать объем статьи.

    Что еще можно было бы делать с помощью конвертов?


    Отличный вопрос! На который у нас самих пока нет исчерпывающего ответа. Вероятно, возможности ограничиваются разве что фантазией пользователей. Ну а если для воплощения фантазий в SObjectizer-е чего-то не хватает, то об этом можно сказать нам. Мы всегда прислушиваемся. И, что немаловажно, временами даже делаем :)

    Интеграция агентов с mchain-ами


    Если же говорить чуть более серьезно, то есть еще одна фича, которую хотелось бы временами иметь и которая даже планировалась для so_5_extra-1.2.0. Но которая, скорее всего, в релиз 1.2.0 уже не попадет.

    Речь идет о том, чтобы упростить интеграцию mchain-ов и агентов.

    Дело в том, что первоначально mchain-ы были добавлены в SObjectizer для того, чтобы упростить общение агентов с другими частями приложения, которые написаны без агентов. Например, есть главный поток приложения, на котором с помощью GUI идет взаимодействие с пользователем. И есть несколько агентов-worker-ов, которые выполняют фоновую «тяжелую» работу. Отослать сообщение агенту из главного потока не проблема: достаточно вызвать обычный send. А вот как передать информацию назад?

    Для этого и были добавлены mchain-ы.

    Но со временем выяснилось, что mchain-ы могут играть гораздо большую роль. Можно, в принципе, делать многопоточные приложения на SObjectizer-е вообще без агентов, только на mchain-ах (подробнее здесь). А еще можно использовать mchain-ы как средство балансировки нагрузки на агентов. Как механизм решения проблемы producer-consumer.

    Проблема producer-consumer заключается в том, что если producer генерирует сообщения быстрее, чем их может обрабатывать consumer, то нас ждут неприятности. Очереди сообщений будут расти, со временем может деградировать производительность или вообще произойдет вылет приложения из-за исчерпания памяти.

    Обычное решение, которое мы предлагали использовать в этом случае — это использовать пару агентов collector-performer. Так же можно использовать и message limits (либо как основной механизм защиты, либо как дополнение к collector-performer). Но написание collector-performer требует дополнительной работы от программиста.

    А вот mchain-ы могли бы использоваться для этих целей с минимальными усилиями со стороны разработчика. Так, producer бы помещал очередное сообщение в mchain, а consumer бы забирал сообщения из этого mchain.

    Но проблема в том, что когда consumer — это агент, то агенту не очень удобно работать с mchain-ом посредством имеющихся функций receive() и select(). И вот это неудобство можно было бы попробовать устранить с помощью какого-то инструмента для интеграции агентов и mchain-ов.

    При разработке такого инструмента нужно будет решить несколько задачек. Например, когда сообщение приходит в mchain, то в какой момент оно должно быть из mchain-а извлечено? Если consumer свободен и ничего не обрабатывает, то можно забрать сообщение из mchain-а сразу и отдать его агенту-consumer-у. Если consumer-у уже было отослано сообщение из mchain-а, он это сообщение еще не успел обработать, но в mchain уже приходит новое сообщение… Как быть в этом случае?

    Есть предположение, что конверты могут помочь в этом случае. Так, когда мы берем первое сообщение из mchain-а и отсылаем его consumer-у, то мы оборачиваем это сообщение в специальный конверт. Когда конверт видит, что сообщение доставлено и обработано, он запрашивает следующее сообщение из mchain-а (если таковое есть).

    Конечно, здесь все не так просто. Но пока что выглядит вполне решаемо. И, надеюсь, подобный механизм появится в одной из следующих версий so_5_extra.

    Уж не ящик ли Пандоры мы собираемся открыть?


    Нужно отметить, что у нас самих добавленные возможности вызывают двойственные чувства.

    С одной стороны, конверты уже позволили/позволяют сделать вещи, о которых ранее говорилось (а о чем-то просто мечталось). Например, это гарантированная отмена таймеров и ограничение на время доставки, отчеты о доставки, возможность отзыва ранее отосланного сообщения.

    С другой стороны, непонятно, к чему это приведет впоследствии. Ведь из любой возможности можно сделать проблему, если начать эту возможность эксплуатировать где нужно и где не нужно. Так может мы приоткрываем ящик Пандоры и сами еще не представляем, что нас ждет?

    Остается только набраться терпения и посмотреть, куда это все нас приведет.

    О ближайших планах развития SObjectizer-а вместо заключения


    Вместо заключения хочется рассказать о том, каким мы видим самое ближайшее (и не только) будущее SObjectizer-а. Если кого-то что-то в наших планах не устраивает, то можно высказаться и повлиять на то, как SObjectizer-5 будет развиваться.

    Первые бета-версии SObjectizer-5.5.23 и so_5_extra-1.2.0 уже зафиксированы и доступны для загрузки и экспериментов. К релизу нужно будет проделать еще много работы в области документации и примеров использования. Поэтому официальный релиз планируется в первой декаде ноября. Если получится раньше, сделаем раньше.

    Релиз SObjectizer-5.5.23, судя по всему, будет означать, что эволюция ветки 5.5 подходит к своему финалу. Самый первый релиз в этой ветке состоялся четыре года назад, в октябре 2014-го. С тех пор SObjectizer-5 эволюционировал в рамках ветки 5.5 без каких-либо серьезных ломающих изменений между версиями. Это было непросто. Особенно с учетом того, что все это время нам приходилось оглядываться на компиляторы, в которых была далеко не идеальная поддержка C++11.

    Сейчас мы уже не видим смысла оглядываться на совместимость внутри ветки 5.5 и, особенно, на старые C++ компиляторы. То, что можно было оправдать в 2014-ом, когда C++14 еще только готовились официально принять, а C++17 еще не было на горизонте, сейчас уже выглядит совсем по-другому.

    Плюс к тому, в самом SObjectizer-5.5 уже накопилось изрядное количество граблей и подпорок, которые появились из-за этой самой совместимости и которые затрудняют дальнейшее развитие SObjectizer-а.

    Поэтому мы в ближайшие месяцы собираемся действовать по следующему сценарию:

    1. Разработка следующей версии so_5_extra, в которую хочется добавить инструментарий для упрощения написания тестов для агентов. Будет ли это so_5_extra-1.3.0 (т.е. с ломающими изменениями относительно 1.2.0) или это будет so_5_extra-1.2.1 (т.е. без ломающих изменений) пока не понятно. Посмотрим, как пойдет. Понятно только, что следующая версия so_5_extra будет базироваться на SObjectizer-5.5.

    1a. Если для следующей версии so_5_extra потребуется сделать что-то дополнительное в SObjectizer-5.5, то будет выпущена следующая версия 5.5.24. Если же для so_5_extra не нужно будет вносить доработки в ядро SObjectizer-а, то версия 5.5.23 окажется последней значимой версией в рамках ветки 5.5. Мелкие bug-fix релизы будут выходить. Но само развитие ветки 5.5 прекращается на версии 5.5.23 или 5.5.24.

    2. Затем будет выпущена версия SObjectizer-5.6.0, которая откроет новую ветку. В ветке 5.6 мы вычистим код SObjectizer-а от всех накопившихся костылей и подпорок, а также от старого хлама, который давным давно помечен, как deprecated. Вероятно, какие-то вещи подвергнуться рефакторингу (например, может быть изменен abstract_message_box_t), но вряд ли кардинальному. Основные же принципы работы и характерные черты SObjectizer-5.5 в SObjectizer-5.6 останутся в том же виде.

    SObjectizer-5.6 будет требовать уже C++14 (хотя бы на уровне GCC-5.5). Компиляторы Visual C++ ниже VC++ 15 (который из Visual Studio 2017) поддерживаться не будут.

    Ветка 5.6 рассматривается нами как стабильная ветка SObjectizer-а, которая будет актуальна до тех пор, пока не появится первая версия SObjectizer-5.7.

    Релиз версии 5.6.0 хотелось бы сделать в начале 2019-го года, ориентировочно в феврале.

    3. После стабилизации ветки 5.6 мы бы хотели начать работать над веткой 5.7, в которой можно было бы пересмотреть какие-то базовые принципы работы SObjectizer-а. Например, совсем отказаться от публичных диспетчеров, оставив только приватные. Переделать механизм коопераций и их взаимоотношений родитель-потомок, тем самым избавившись от узкого места при регистрации/дерегистрации коопераций. Убрать деление на message/signal. Оставить для отсылки сообщений только send/send_delayed/send_periodic, а методы deliver_message и schedule_timer упрятать «под капот». Модифицировать механизм диспетчеризации сообщений так, чтобы либо совсем убрать dynamic_cast-ы из этого процесса, либо свести их к самому минимуму.

    В общем, тут есть где развернуться. При этом SObjectizer-5.7 уже будет требовать C++17, без оглядки на C++14.

    Если смотреть на вещи без розовых очков, то хорошо, если релиз 5.7.0 состоится в конце осени 2019. Т.е. основной рабочей версией SObjectizer-а на 2019-й будет ветка 5.6.

    4. Параллельно всему этому будет развиваться so_5_extra. Вероятно, вместе с SObjectizer-5.6 будет выпущена версия so_5_extra-2, которая на протяжении 2019-го года будет вбирать в себя новый функционал, но на базе SObjectizer-5.6.

    Таким образом мы сами видим для SObjectizer-5 поступательную эволюцию с постепенным пересмотром каких-то из базовых принципов работы SObjectizer-5. При этом мы постараемся делать это как можно более плавно, чтобы можно было с минимальными болевыми ощущениями переходить с одной версии на другую.

    Однако, если кто-то хотел бы от SObjectizer-а более кардинальных и значимых изменений, то у нас есть соображения на этот счет. Если совсем коротко: можно переделать SObjectizer как угодно, вплоть до того, чтобы реализовать SObjectizer-6 для другого языка программирования. Но делать это полностью за свой счет, как это происходит с эволюцией SObjectizer-5, мы не будем.

    На этом, пожалуй, все. В комментариях к предыдущей статье получилось хорошее и конструктивное обсуждение. Нам было бы полезно, если бы подобное обсуждение случилось и в этот раз. Как всегда мы готовы ответить на любые вопросы, а на толковые, так и с удовольствием.

    А самым терпеливым читателям, добравшимся до этих строк большое спасибо за потраченное на прочтение статьи время.
    Поделиться публикацией
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 2

      +1
      Насколько я помню, одной из проблем была интеграция отзываемых сообщений с limit-ами в chain-ах, чтобы отозванные сообщения не увеличивали size chain-а. Удалось это решить?
        +1
        Было принято такое решение — что раз уж сообщение из очереди не изымается физически и место оно там все равно занимает, то пусть учитываются и в лимитах, и в размерах chain-ов. Так что сейчас, даже если сообщение отозвано, но все еще стоит в очереди, то в лимитах оно учитывается.

        Здесь ничего лучше не придумалось. И, вроде как, в обсуждении такой подход категорического неприятия не вызвал.

      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

      Самое читаемое