Научиться перехватывать необработанные сообщения или пример того, как SObjectizer обрастает новыми фичами…

    Нам очень приятно, когда в SObjectizer добавляются новые возможности, возникшие в результате подсказок и/или пожеланий пользователей SObjectizer-а. Хотя далеко не всегда это оказывается просто. Ведь, с одной стороны, у нас, как у команды разработчиков и старых пользователей SObjectizer-а, уже есть собственные стереотипы о том, как SObjectizer принято использовать. И не всегда получается сразу оценить «свежий взгляд со стороны», понять что реально хочет видеть пользователь во фреймворке и почему он не удовлетворен имеющимися средствами. С другой стороны, SObjectzer не такой уж и маленький фреймворк, добавление новых возможностей требует определенной осмотрительности. Нужно, чтобы новая функциональность не конфликтовала с уже имеющимися фичами. И, тем более, чтобы после добавления чего-то нового не сломалось то, что уже есть и давно работает. Плюс к тому, у нас есть пунктик по поводу сохранения совместимости между версиями SObjectizer-а, поэтому мы сильно против кардинальных изменений…

    В общем, добавление нового в SObjectizer — это всегда приятно с точки зрения увеличения возможностей фреймворка и повышения удобства его использования. Но далеко не всегда это так же приятно и просто с точки зрения реализации.

    Под катом небольшой рассказ о том, как в SObjectizer добавлялась одна новая фича. Может быть кому-то из читателей будет интересно посмотреть, как старый фреймворк адаптируется под запросы новых пользователей.

    Преамбула


    Итак, все началось с того, что один из пользователей SObjectizer-а, PavelVainerman, обратил наше внимание на то, что в SObjectizer нет готовых удобных средств для выполнения эпизодических однократных взаимодействий между агентами.

    Оказалось, что имеется в виду вот что. Допустим, агент A хочет отослать запрос агенту B и хочет получить ответное сообщение от агента B. Но при этом агент A не хочет ждать ответа дольше, чем 5 секунд. Тривиальное решение «в лоб», которое сразу же приходит в голову, может выглядеть так:

    // Тип сообщения-запроса.
    struct request {
      const so_5::mbox_t reply_to_; // Куда отсылать ответ.
      ... // Другие параметры, конструкторы и т.д.
    };
    // Тип сообщения-ответа.
    struct reply {
      ...
    };
    class A : public so_5::agent_t {
      // Сигнал, который будет использоваться для обозначения истечения тайм-аута.
      struct reply_timed_out final : public so_5::signal_t {};
      ...
      // Обработчики сообщений.
      void on_reply(mhood_t<reply> cmd) {...}
      void on_reply_timeout(mhood_t<reply_timed_out>) {...}
      ...
      // Место, где нам требуется сделать запрос агенту B.
      void ask_something(const so_5::mbox_t & B_mbox) {
        // Подписываемся на ответное сообщение и на сообщение о тайм-ауте.
        so_subscribe_self().event(&A::on_reply);
        so_subscribe_self().event(&A::on_reply_timeout);
        // Теперь отсылаем запрос. В запросе передаем свой mbox, на который
        // будет отсылаться ответ.
        so_5::send<request>(B_mbox, so_direct_mbox(), ...);
        // И отсылаем себе отложенное сообщение для того, чтобы отсчитать тайм-аут.
        so_5::send_delayed<reply_timed_out>(*this, 5s);
    };
    

    К сожалению, эта простая версия лишь является наглядной демонстрацией правдивости афоризма о том, что «любая сложная задача имеет простое, легкое для понимания неправильное решение». Здесь есть сразу несколько проблем.

    Первая проблема связана с отложенным сообщением A::reply_timed_out. Если ответ от агента B вовремя не пришел, то с reply_timed_out у нас все нормально. Мы его получаем, обрабатываем и забываем о нем. А вот что будет, если ответ от агента B пришел вовремя? Что произойдет с reply_timed_out?

    Оно все равно придет к агенту A. Ведь никто reply_timed_out не отменял. Значит, как только нить таймера SObjectizer-а отсчитает 5 положенных секунд, сообщение reply_timed_out будет доставлено агенту A. И мы его получим и обработаем не смотря на то, что нам оно уже не нужно. Что неправильно. Правильно было бы сделать так, чтобы сообщение reply_timed_out к нам не попадало после того, как мы получили reply от агента B.

    Самый надежный способ сделать это — это отменить подписку на reply_timed_out. Почему именно так — это тема отдельного большого разговора. Если кому-то интересно, можно отдельно поговорить на эту тему. Пока же ограничимся тем, что отписка от отложенного сообщения является «железобетонным» вариантом решения проблем с отложенным сообщением.

    Вторая проблема связана с тем, что вряд ли агенту A нужно будет подобным образом общаться только с агентом B. Скорее всего агент A обменивается сообщениями request/reply сразу с несколькими агентами. Соответственно, когда request улетает одновременно агентам B и C, то агенту A нужно как-то понимать, от кого прилетел ответ. Или чей ответ не был получен в течении 5 секунд.

    Вторая проблема более-менее удобно решается за счет отказа от использования собственного mbox-а агента A в качестве обратного адреса. Проще создавать новый mbox для каждого нового взаимодействия. И именно этот новый mbox будет использоваться и для получения ответа, и для отложенного сообщения именно для этого запроса.

    Однако, как только мы вводим новый mbox, мы должны позаботиться о том, чтобы mbox был удален после того, как он перестает быть нужным. Для этого мы должны удалять подписки на этот mbox. Если подписки не будут удалены, то mbox останется жить, а это приведет к постоянному росту потребляемой памяти — мы же будем создавать новые mbox-ы на каждый новый запрос, а удаляться эти mbox-ы не будут.

    В общем, если учесть эти две проблемы, то простое решение будет преобразовано в не очень простое:

    class A : public so_5::agent_t {
      // Сигнал, который будет использоваться для обозначения истечения тайм-аута.
      struct reply_timed_out final : public so_5::signal_t {};
      ...
      // Обработчики сообщений теперь будут получать дополнительный параметр,
      // который будет идентифицировать конкретный запрос, к которому относится сообщение.
      void on_reply(const request_info & info, mhood_t<reply> cmd) {...}
      void on_reply_timeout(const request_info & info, mhood_t<reply_timed_out>) {...}
      ...
      // Место, где нам требуется сделать запрос стороннему агенту.
      // Здесь мы так же получаем дополнительный параметр, описывающий запрос.
      void ask_something(const request_info & info, const so_5::mbox_t & dest) {
        // Нам нужен уникальный mbox для взаимодействия в рамках этого запроса.
        const auto uniq_mbox = so_environment().create_mbox();
        // Этот вспомогательный объект будет помогать удалять подписки.
        auto subscriptions_dropper = [this, uniq_mbox] {
          so_drop_subscription<reply>(uniq_mbox);
          so_drop_subscription<reply_timed_out>(uniq_mbox);
        };
        // Подписываемся на ответное сообщение и на сообщение о тайм-ауте.
        so_subscribe(uniq_mbox)
          .event([this, info, subscriptions_dropper](mhood_t<reply> cmd) {
            // Уничтожаем подписки.
            subscription_dropper();
            // Выполняем основную обработку.
            on_reply(info, cmd);
          })
          .event([this, info, subscriptions_dropper](mhood_t<reply_timed_out> cmd) {
            subscription_dropper();
            on_reply_timeout(info, cmd);
          });
        // Теперь отсылаем запрос. В запросе передаем свой уникальный mbox, который
        // мы создали специально для этого запроса.
        so_5::send<request>(B_mbox, uniq_mbox, ...);
        // И отсылаем себе отложенное сообщение для того, чтобы отсчитать тайм-аут.
        so_5::send_delayed<reply_timed_out>(so_environment(), uniq_mbox, 5s);
      }
    };
    

    Получается уже не так просто и компактно, как хотелось бы. Но и это еще далеко не все. Так, в этом решении нет никакой exception safety. Нет явной отмены отложенного сообщения тогда, когда оно уже не нужно. Но, что еще важнее, если агент A захочет иметь не одно дефолтное состояние, как в примере выше, а несколько состояний, в каждом из которых ему нужно реагировать на сообщения по-разному, то все станет еще страшнее. Ну и еще страшнее все станет, если обмен между A и B будет требовать не одно ответное сообщение, а несколько. Скажем, если вместо reply будет successful_reply и failed_reply, то объем работы для разработчика агента A заметно увеличится.

    Почему мы сами не сталкивались с такой проблемой?


    Маленькое отступление в сторону. Когда нам стало понятно, о чем нам говорит PavelVainerman, мы сами удивились. Ведь проблема действительно очевидна. Но почему же мы сами с ней не сталкивались? По крайней мере не сталкивались настолько часто, чтобы обратить на нее внимание и включить решение для этой проблемы в SObjectizer.

    Вероятно, здесь имели место два фактора.

    Во-первых, мы довольно быстро пришли к идеям SEDA-подхода. Там количество агентов невелико, между ними установлены стабильные связи, поэтому там таких проблем нет в принципе.

    Во-вторых, наверное, однократное разовое взаимодействие у нас чаще всего применяется между коротко живущими агентами. И для агента, который живет всего лишь для обработки одной-единственной операции, эти проблемы не актуальны.

    Как бы то ни было, невозможно не отметить тот факт, что как только твоим инструментом начинают пользоваться новые люди, так сразу же выясняется, что они хотят использовать инструмент совсем не так, как привык это делать ты сам.

    Что мы в итоге сделали?


    В итоге мы расширили свою надстройку над SObjectizer под названием so_5_extra, добавив в нее поддержку т.н. асинхронных операций. Посредством асинхронных операций приведенный выше пример может быть переписан следующим образом:

    class A : public so_5::agent_t {
      // Сигнал, который будет использоваться для обозначения истечения тайм-аута.
      struct reply_timed_out final : public so_5::signal_t {};
      ...
      // Обработчики сообщений получают дополнительный параметр,
      // который будет идентифицировать конкретный запрос, к которому относится сообщение.
      void on_reply(const request_info & info, mhood_t<reply> cmd) {...}
      void on_reply_timeout(const request_info & info, mhood_t<reply_timed_out>) {...}
      ...
      // Место, где нам требуется сделать запрос стороннему агенту.
      // Здесь мы так же используем дополнительный параметр, описывающий запрос.
      void ask_something(const request_info & info, const so_5::mbox_t & dest) {
        // Нам нужен уникальный mbox для взаимодействия в рамках этого запроса.
        const auto uniq_mbox = so_environment().create_mbox();
    
        // Описываем и активируем асинхронную операцию.
        so_5::extra::async_op::time_limited::make<reply_timed_out>(*this)
          .completed_on(uniq_mbox, so_default_state(), [this, info](mhood_t<reply> cmd) {
              on_reply(info, cmd);
            })
          .timeout_handler(so_default_state(), [this, info](mhood_t<reply_timed_out> cmd) {
              on_reply_timeout(info, cmd);
            })
          .activate(5s);
    
        // Теперь отсылаем запрос. В запросе передаем свой уникальный mbox, который
        // мы создали специально для этого запроса.
        so_5::send<request>(B_mbox, uniq_mbox, ...);
      }
    };
    

    Подробнее о новых асинхронных операциях в so_5_extra можно прочитать здесь.

    Но речь сегодня пойдет не о том, как сделаны сами асинхронные сообщения. А о том, что потребовалось сделать в SObjectizer для того, чтобы асинхронные сообщения заработали в so_5_extra.

    В чем была проблема с реализацией time_limited асинхронных операций?


    В so_5_extra вошло две реализации асинхронных операций: time_unlimited, когда на время выполнения операции не накладывается вообще никаких ограничений, и time_limited, когда операцию нужно завершить за отведенное время. Выше как раз речь шла именно о time_limited-операциях, т.к. именно с их реализацией и была одна из основных загвоздок.

    Суть в том, что когда мы начинаем time_limited-операцию, то мы должны обязательно получить и обработать отложенное сообщение, которое и ограничивает время асинхронной операции. И вот с этим «обязательно» как раз-то было не все просто.

    Дело в том, что одной из ключевых фич SObjectizer-а являются состояния агентов. Состояния позволяют агентам обрабатывать разные наборы сообщений в каждом из состояний. Или же обрабатывать одни и те же сообщения в разных состояниях по-разному. Но есть и оборотная сторона: если какое-то сообщение нужно обработать во всех состояниях, то нужно явно подписывать обработчик этого сообщения для каждого из состояний. Т.е. писать что-то вроде:

    class default_msg_handler_demo : public so_5::agent_t {
      // Список состояний агента.
      state_t st_first{this}, st_second{this}, st_third{this};
      ...
      // Обработчик, который мы хотим повесить на каждое состояние.
      void some_msg_default_handler(mhood_t<some_msg> cmd) {...}
      ...
      virtual void so_define_agent() override {
        ...
        // Подписываем свой обработчик "по умолчанию".
        so_subscribe(some_mbox)
          .in(st_first).in(st_second).in(st_third)
          .event(&default_msg_handler_demo::some_msg_default_handler);
        ...
      }
    };

    Естественно, что это не самое хорошее и удобное решение.

    За счет использования возможностей иерархических конечных автоматов можно сделать проще, удобнее и надежнее:

    class default_msg_handler_demo : public so_5::agent_t {
      // Список состояний агента.
      // Специальное родительское состояние.
      state_t st_parent{this},
        // И список актуальных состояний, которые будут дочерними.
        st_first{initial_substate_of{st_parent}},
        st_second{substate_of{st_parent}},
        st_third{substate_of{st_parent}};
      ...
      // Обработчик, который мы хотим повесить на каждое состояние.
      void some_msg_default_handler(mhood_t<some_msg> cmd) {...}
      ...
      virtual void so_define_agent() override {
        ...
        // Подписываем свой обработчик "по умолчанию" только в родительском состоянии.
        so_subscribe(some_mbox)
          .in(st_parent)
          .event(&default_msg_handler_demo::some_msg_default_handler);
        ...
      }
    };

    Теперь обработчик «по умолчанию» будет вызваться вне зависимости от того, в каком из состояний находится агент.

    Но, к сожалению, такой подход требует, чтобы агент изначально был спроектирован с использование иерархических конечных автоматов. Вряд ли асинхронными операциями из so_5_extra было бы удобно пользоваться, если бы они накладывали столь жесткое требование на пользователей: мол, хотите пользоваться асинхронными операциями — извольте создать в своем агенте родительское состояние.

    Да и не всегда это в принципе возможно сделать. Допустим, кто-то написал вам библиотеку агентов, в которой есть базовый тип basic_device_manager. Вы делаете свой собственный класс-наследний my_device_manager и вам в my_device_manager нужно использовать асинхронные операции. Если в basic_device_manager разработчик не сделал что-то вроде st_parent, то вы туда свой st_parent уже не добавите.

    В общем, требовалось сделать что-то, что позволило бы ловить сообщения, которые адресовались агенту, но которые агентом не были обработаны. Такие сообщения еще иногда называют deadletters.

    Что и как мы в итоге сделали?


    Deadletter handlers


    Мы сделали так, что теперь разработчик может повесить собственный обработчик на сообщение, которое не было обработано «нормальным» обработчиком. Например:

    class deadletter_handler_handler_demo : public so_5::agent_t {
      state_t st_first{this}, st_second{this}, st_third{this};
      ...
      void deadletter_handler(mhood_t<some_msg> cmd) {...}
      ...
      void normal_handler(mhood_t<some_msg> cmd) {...}
      ...
      virtual void so_define_agent() override {
        ...
        // Подписываем "нормальный" обработчик только для st_first.
        so_subscribe(some_mbox)
          .in(st_first).event(&deadletter_handler_demo::normal_handler);
    
        // Подписываем обработчик "потерянных" сообщений.
        so_subscribe_deadletter_handler(some_mbox, &deadletter_handler_demo::deadletter_handler);
        ...
      }
    };

    Теперь, если агент получает сообщение some_msg из почтового ящика some_mbox находясь в состояние st_first, то для обработки сообщения будет вызван normal_handler. А вот если агент будет находится в любом другом состоянии, то для обработки этого сообщение вызовется deadletter_handler.

    Эту фичу как раз и используют time_limited-операции. При активации операции вешается deadletter_handler на сообщение об истечении тайм-аута. И в каком бы состоянии агент не находился в момент прихода этого сообщения, сообщение будет получено и обработано. Что и позволяет завершить асинхронную операцию. Даже в случае, когда разработчик ошибся и не определил все нужные ему обработчики тайм-аутов.

    Привлекательная идея, которая не была реализована


    Первая мысль, которая возникла как только сформулировалась проблема deadletter handler-ов, была в том, чтобы снабдить каждого агента неким родительским состоянием. А все остальные состояния чтобы автоматически становились дочерними к нему. Т.е. была идея насильно впихнуть в каждого агента некое суперсостояние. Которое просто есть и с этим ничего не сделать :)

    Эта идея была очень привлекательной с точки зрения текущего механизма хранения и поиска подписок (этот механизм не так уж и прост).

    Так же эта идея очень красива с идеологической точки зрения. Иерархические конечные автоматы как они есть.

    Но пришлось от нее отказаться (может быть на время?).

    Главная причина отказа в том, что объект state_t является довольно-таки тяжеловесным. В зависимости от компилятора, стандартной библиотеки и параметров компиляции, state_t в 64-х битовом режиме может занимать от 150 до 250 байт. Если насильно добавлять суперсостояние каждому агенту, то «вес» каждого агента увеличивается на полторы-две сотни байт. Вот просто так, на ровном месте. Даже если этому агенту суперсостояние вообще не нужно.

    Была, да и есть, на самом деле, еще и другая причина. Суперсостояние для каждого агента — это слишком большое нововведение для SObjectizer-а, чтобы сделать его с бухты-барахты,
    не взвесив тщательно все последствия. Есть у меня лично большие подозрения на тему того,
    что стоит добавить суперсостояния в SObjectizer и ими начнут злоупотреблять.

    В общем, идея с суперсостоянием в работу над версией 5.5.21 не пошла. Но зарубка на память осталась. Возможно, она еще найдет свое воплощение. Если у кого-то есть соображения на этот счет, то интересно было бы услышать и пообсуждать.

    Актуальное решение


    От идеи с суперсостоянием отказались, но текущий механизм хранения подписок все-таки менять не хотелось. Поэтому было найдено решение, в котором все-таки дополнительный объект state_t потребовался. Но он существует один для всех и все агенты ссылаются на него.

    Благодаря этому удалось использовать те же самые инструменты для регистрации deadletter handler-ов и для их поиска. Фактически, so_subscribe_deadletter_handler есть ни что иное, как подписка обработчика сообщения для специального, невидимого для пользователя состояния. Ну а поиск deadletter handler-а для сообщения — это всего лишь обычный поиск обработчика, но не для текущего состояния агента, а для этого специального, невидимого состояния. Там, правда, есть некие дополнительные действия для случая, когда включен режим трассировки механизма доставки сообщений, но это совсем уже скучные детали.

    Было ли все так очевидно и просто?


    Когда я вычитывал эту статью перед публикацией, то ловил себя на мысли о том, что рассказывается какая-то тривиальщина. Ну вроде бы все просто и понятно. Только вот путь к этому «просто и понятно» оказался совсем не быстрым, не прямым и не очевидным. Если кому-то интересно, то следы эволюции идеи асинхронных операций можно найти в этой мини-серии блог-постов: №1, №2 и №3. Хотя, как оказалось, даже в заключительном посте этой серии было описано отнюдь не результирующее решение. Пришлось еще наткнуться на серьезный собственный просчет и поломать голову над тем, как воспрепятствовать утечкам памяти при наличии циклических ссылок между объектами. Но это уже совсем другая история…

    Несколько слов в завершение


    Сперва благодарности...


    Хочется поблагодарить всех, кто помогает нам развивать SObjectizer: тем кто пользуется SO-5 и высказывает свои соображения и предложения (отдельное спасибо здесь PavelVainerman), тем, кто еще не пользуется SO-5, но помогает советами и не только (большущая благодарность, в частности, masterspline), да и просто тем, кому не лень ставить +1 в новостях о SObjectizer на различных ресурсах и звездочки на github-е :) Большое вам всем спасибо!

    … и коротко о планах на ближайшее будущее


    Мы собираемся начать работы над следующей версией SObjectizer-а под номером 5.5.22 уже в ближайшее время. Главная новая фича, которую мы хотим увидеть в 5.5.22 — это поддержка параллельных состояний для агентов. Агенты уже могут использовать продвинутые возможности иерархических состояний. Как то: вложенность состояний, shallow и deep-history для состояний, обработчики входа-выхода, временные лимиты на пребывании в состоянии. Но вот чего пока еще в SObjectizer не было, так это параллельных состояний.

    В свое время мы не стали их делать по каким-то причинам. Но практика показала, что кому-то из пользователей параллельные состояния нужные и облегчают им жизнь. Так что будем их делать. Все желающие приглашаются к обсуждению: любые конструктивные соображения, а особенно, примеры из практики и личного опыта будут нам очень полезны.

    Ну и вообще было бы интересно узнать какие у вас впечатления о SObjectizer-е, что нравится, что не нравится, что хотелось бы иметь в SO-5, что мешает… Ну и, естественно, мы готовы ответить на любые вопросы о SObjectizer-е.
    Share post

    Similar posts

    Comments 2

      0

      Почему всё так сложно. В Эрланге достаточно просто добавить after timeout в receive.

        0
        Почему всё так сложно. В Эрланге...
        Прямое сравнение с Erlang-ом неправомочно по двум причинам.

        Во-первых, Erlang реализует для своих процессов вытесняющую многозадачность. Поэтому в Erlang-е процесс может «заснуть» на receive и это не повлияет на другие процессы. В SObjectizer-е для агентов, в общем случае, реализуется не вытесняющая, а кооперативная многозадачность. Поэтому агент должен отдать управление назад SObjectizer-у после того, как завершит обработку очередного сообщения. Т.е. должен вернуться из своего текущего обработчика. Из-за этого уже получается, что если в SO-5 агент хочет отослать сообщение другому агенту и подождать ответ, то агенту нужно два метода-обработчика (в одном он отсылает свой запрос, в другом получает ответ) + кухня по подписке-отписке. Ну и при наличии тайм-аутов это еще более усложняется.

        С Erlang-ом в этом плане можно сравнивать такие реализации Модели Акторов, в которых акторы представлены отдельными потоками выполнения. Например, каждому актору выдается своя нить ОС или своя зеленая нить/файбер. Скажем, в SO-5 вы можете привязать агентов к диспетчерам так, что у каждого агента будет своя рабочая нить. Тогда вместо манипуляций с подпиской/отпиской можно будет использовать CSP-каналы (mchains). И ожидание ответов может быть записано как:
        receive(from(ch).total_time(5s),
          [this](mhood_t<successful_reply> cmd) {...},
          [this](mhood_t<failed_reply> cmd) {...});


        Во-вторых, SObjectizer использует для взаимодействия агентов механизм publish-subscribe. Он в принципе отличается от того, что принято в Модели Акторов вообще (там жестко один почтовый ящик у актора) и в Erlang-е в частности (там одна единственная очередь сообщений для процесса да плюс еще и selective receive). Поэтому показанный пример имело бы смысл сравнивать с реализацией взаимодействия через какой-то MQ-шный брокер и несколько топиков в этом брокере.

        Ну и заодно хочу спросить. А вот, допустим, в Erlang-е вы отослали сообщения трем процессам и вам нужно дождаться ответов от них. Только один ответ нужно ждать не более N секунд, второй — не более M, третий не более L. Как между собой соотносятся N, M и L неизвестно. Так же вы должны обрабатывать ответные сообщения в том порядке, в котором они приходят (а не сперва от первого, затем от второго, затем от третьего). Как это будет выглядеть в Erlang-е? Полагаю, в виде цикла, на каждой итерации которого вы будете вынуждены вычислять наименьший из оставшихся тайм-аутов и это значение будете подставлять в after timeout.

      Only users with full accounts can post comments. Log in, please.