Как стать автором
Обновить

«Современные» обедающие философы на C++ посредством акторов и CSP

Время на прочтение35 мин
Количество просмотров15K

Некоторое время назад ссылка на статью "Modern dining philosophers" распространилась по ресурсам вроде Reddit и HackerNews. Статья интересная, она показывает несколько решений этой известной задачи, реализованных на современном C++ с использованием task-based подхода. Если кто-то это статью еще не читал, то имеет смысл потратить время и прочесть ее.


Однако, не могу сказать, что представленные в статье решения мне показались простыми и понятными. Вероятно это как раз из-за использования тасков. Слишком уж их много создается и диспетчируется посредством разнообразных диспетчеров/сериализаторов. Так что не всегда понятно, где, когда и какие задачи выполняются.


При этом task-based подход не является единственным возможным для решения подобных задач. Почему бы не посмотреть, как задача "обедающих философов" решается посредством моделей Акторов и CSP?


Посему попробовал посмотреть и реализовал несколько решений этой задачи как с использованием Акторов, так и с использованием CSP. Код этих решений можно найти в репозитории на GitHub-е. А под катом пояснения и объяснения, так что кому интересно, милости прошу под кат.


Несколько общих слов


У меня не было цели в точности повторить решения, показанные в той самой статье "Modern dining philosophers", тем более, что мне в них принципиально не нравится одна важная штука: по сути, в тех решениях "философ" ничего сам не делает. Он только говорит "хочу есть", а дальше либо ему кто-то магическим образом предоставляет вилки, либо говорит "сейчас не получится".


Понятно, почему автор прибег к такому поведению: оно позволяет использовать одну и ту же реализацию "философа" совместно с разными реализациями "протоколов". Однако, мне лично кажется, что более интересно когда именно "философ" пытается взять сперва одну вилку, затем другую. И когда "философ" вынужден обрабатывать неудачные попытки захвата вилки.


Именно такие реализации задачи "обедающих философов" я и попробовал сделать. При этом в некоторых решениях использовались те же самые подходы, что и в упомянутой статье (например, реализуемые протоколами ForkLevelPhilosopherProtocol и WaiterFair).


Свои решения я строил на базе SObjectizer-а, что вряд ли удивит тех, кто читал мои статьи раньше. Если же кто-то про SObjectizer еще не слышал, то в двух словах: это один из немногих живых и развивающихся OpenSource "акторных фреймворков" для С++ (из прочих можно упомянуть так же CAF и QP/C++). Надеюсь, что приведенные примеры с моими комментариями будут достаточно понятными даже для незнакомых с SObjectizer-ом. Если нет, то я с удовольствием отвечу на вопросы в комментариях.


Решения на базе Акторов


Обсуждение реализованных решений начнем с тех, которые сделаны на базе Акторов. Сперва рассмотрим реализацию решения Эдсгера Дейкстры, затем перейдем к нескольким другим решениям и посмотрим, как отличается поведение каждого из решений.


Решение Дейкстры


Эдсгер Дейкстра, мало того, что сформулировал саму задачу "обедающих филофосов" (формулировку оной с использованием "вилок" и "спагетти" озвучил Тони Хоар), так еще и предложил очень простое и красивое решение. А именно: философы должны захватывать вилки только в порядке увеличения номеров вилок и если философ сумел взять первую вилку, то он уже не отпускает ее пока не получит и вторую вилку.


Например, если философу нужно пользоваться вилками с номерами 5 и 6, то философ должен сперва взять вилку номер 5. Только после этого он может взять вилку номер 6. Т.о., если вилки с меньшими номерами лежат слева от философов, то философ должен сперва взять левую вилку и лишь затем он может взять правую вилку.


Последний философ в списке, которому приходится иметь дело с вилками за номерами (N-1) и 0, поступает наоборот: он сперва берет правую вилку с номером 0, а затем левую вилку с номером (N-1).


Для реализации этого подхода потребуется два типа акторов: один для вилок, второй для философов. Если философ захотел поесть, то он отсылает соответствующему актору-вилке сообщение на захват вилки, а актор-вилка отвечает ответным сообщением.


Код реализации этого подхода можно увидеть здесь.


Сообщения


Прежде чем говорить об акторах, нужно посмотреть на сообщения, которыми акторы будут обмениваться:


struct take_t
{
   const so_5::mbox_t m_who;
   std::size_t m_philosopher_index;
};

struct taken_t : public so_5::signal_t {};

struct put_t : public so_5::signal_t {};

Когда актор-философ хочет взять вилку, он отсылает актору-вилке сообщение take_t, а актор-вилка отвечает сообщением taken_t. Когда актор-философ заканчивает есть и хочет положить вилки обратно на стол, он отсылает акторам-вилкам сообщения put_t.


В сообщении take_t поле m_who обозначает почтовый ящик (он же mbox) актора-философа. В этот почтовый ящик должно быть отправлено ответное сообщение taken_t. Второе поле из take_t в данном примере не используется, оно нам потребуется, когда мы дойдем до реализаций waiter_with_queue и waiter_with_timestamps.


Актор-вилка


Теперь мы можем посмотреть на то, что из себя представляет актор-вилка. Вот его код:


class fork_t final : public so_5::agent_t
{
public :
   fork_t( context_t ctx ) : so_5::agent_t{ std::move(ctx) } {}

   void so_define_agent() override
   {
      // Начальным должно быть состояние 'free'.
      this >>= st_free;

      // В состоянии 'free' обрабатывается только одно сообщение.
      st_free
         .event( [this]( mhood_t<take_t> cmd ) {
               this >>= st_taken;
               so_5::send< taken_t >( cmd->m_who );
            } );

      // В состоянии 'taken' обрабатываются два сообщения.
      st_taken
         .event( [this]( mhood_t<take_t> cmd ) {
               // Философу придется подождать в очереди.
               m_queue.push( cmd->m_who );
            } )
         .event( [this]( mhood_t<put_t> ) {
               if( m_queue.empty() )
                  // Вилка сейчас никому не нужна.
                  this >>= st_free;
               else
               {
                  // Вилка должна достаться первому из очереди.
                  const auto who = m_queue.front();
                  m_queue.pop();
                  so_5::send< taken_t >( who );
               }
            } );
   }

private :
   // Определение состояний для актора.
   const state_t st_free{ this, "free" };
   const state_t st_taken{ this, "taken" };

   // Очередь ждущих философов.
   std::queue< so_5::mbox_t > m_queue;
};

Каждый актор в SObjectizer-е должен быть производен от базового класса agent_t. Что мы и видим здесь для типа fork_t.


В классе fork_t переопределяется метод so_define_agent(). Это специальный метод, он автоматически вызывается SObjectizer-ом при регистрации нового агента. В методе so_define_agent() производится "настройка" агента для работы в SObjectizer-е: меняется стартовое состояние, производится подписка на нужные сообщения.


Каждый актор в SObjectizer-е представляет из себя конечный автомат с состояниями (даже если актор использует всего одно дефолтное состояние). Вот у актора fork_t есть два состояния: free и taken. Когда актор в состоянии free, вилка может быть "захвачена" философом. И после захвата "вилки" актор fork_t должен перейти в состояние taken. Внутри класса fork_t состояния представляются экземплярами st_free и st_taken специального типа state_t.


Состояния позволяют обрабатывать входящие сообщения по-разному. Например, в состоянии free агент реагирует только на take_t и реакция эта очень простая: меняется состояние актора и отсылается ответное taken_t:


st_free
   .event( [this]( mhood_t<take_t> cmd ) {
         this >>= st_taken;
         so_5::send< taken_t >( cmd->m_who );
      } );

Тогда как все остальные сообщения, включая put_t в состоянии free попросту игнорируются.


В состоянии же taken актор обрабатывает два сообщения, и даже сообщение take_t он обрабатывает иначе:


st_taken
   .event( [this]( mhood_t<take_t> cmd ) {
         m_queue.push( cmd->m_who );
      } )
   .event( [this]( mhood_t<put_t> ) {
         if( m_queue.empty() )
            this >>= st_free;
         else
         {
            const auto who = m_queue.front();
            m_queue.pop();
            so_5::send< taken_t >( who );
         }
      } );

Здесь наиболее интересен обработчик для put_t: если очередь ждущих философов пуста, то мы можем вернуться в free, а вот если не пуста, то первому из них нужно отослать taken_t.


Актор-философ


Код актора-философа гораздо более объемен, поэтому я не буду приводить его здесь полностью. Мы обсудим лишь наиболее значимые фрагменты.


У актора-философа немного больше состояний:


state_t st_thinking{ this, "thinking.normal" };
state_t st_wait_left{ this, "wait_left" };
state_t st_wait_right{ this, "wait_right" };
state_t st_eating{ this, "eating" };

state_t st_done{ this, "done" };

Актор начинает свою работу в состоянии thinking, потом переключается в wait_left, затем в wait_right, затем в eating. Из eating актор может вернуться в thinking или же может перейти в done, если философ съел все, что должен был.


Диаграмму состояний для актора-философа можно представить следующим образом:


image


Логика же поведения актора описана в реализации его метода so_define_agent():


void so_define_agent() override
{
   // В состоянии thinking реагируем только на сигнал stop_thinking.
   st_thinking
      .event( [=]( mhood_t<stop_thinking_t> ) {
            // Пытаемся взять левую вилку.
            this >>= st_wait_left;
            so_5::send< take_t >( m_left_fork, so_direct_mbox(), m_index );
         } );

   // Когда ждем левую вилку реагируем только на taken.
   st_wait_left
      .event( [=]( mhood_t<taken_t> ) {
            // У нас есть левая вилка. Пробуем взять правую.
            this >>= st_wait_right;
            so_5::send< take_t >( m_right_fork, so_direct_mbox(), m_index );
         } );

   // Пока ждем правую вилку, реагируем только на taken.
   st_wait_right
      .event( [=]( mhood_t<taken_t> ) {
            // У нас обе вилки, можно поесть.
            this >>= st_eating;
         } );

   // Пока едим реагируем только на stop_eating.
   st_eating
      // 'stop_eating' должен быть отослан как только входим в 'eating'.
      .on_enter( [=] {
            so_5::send_delayed< stop_eating_t >( *this, eat_pause() );
         } )
      .event( [=]( mhood_t<stop_eating_t> ) {
         // Обе вилки нужно вернуть на стол.
         so_5::send< put_t >( m_right_fork );
         so_5::send< put_t >( m_left_fork );

         // На шаг ближе к финалу.
         ++m_meals_eaten;
         if( m_meals_count == m_meals_eaten )
            this >>= st_done; // Съели все, что могли, пора завершаться.
         else
            think();
      } );

   st_done
      .on_enter( [=] {
         // Сообщаем о том, что мы закончили.
         completion_watcher_t::done( so_environment(), m_index );
      } );
}

Пожалуй, единственный момент, на котором следует остановится особо — это подход к имитации процессов "размышления" и "еды". В коде актора нет this_thread::sleep_for или какого-то другого способа блокирования текущей рабочей нити. Вместо этого используются отложенные сообщения. Например, когда актор входит в состояние eating он отсылает самому себе отложенное сообщение stop_eating_t. Это сообщение отдается таймеру SObjectizer-а и таймер доставляет сообщение актору когда приходит время.


Использование отложенных сообщений позволяет запускать всех акторов на контексте одной единственной рабочей нити. Грубо говоря, одна нить читает из какой-то очереди сообщения и дергает обработчик очередного сообщения у соответствующего актора-получателя. Подробнее о рабочих контекстах для акторов речь пойдет ниже.


Результаты


Результаты работы этой реализации могут выглядеть следующим образом (небольшой фрагмент):


    Socrates: tttttttttttLRRRRRRRRRRRRRREEEEEEEttttttttLRRRRRRRRRRRRRREEEEEEEEEEEEE
       Plato: ttttttttttEEEEEEEEEEEEEEEEttttttttttRRRRRREEEEEEEEEEEEEEttttttttttLLL
   Aristotle: ttttEEEEEtttttttttttLLLLLLRRRREEEEEEEEEEEEttttttttttttLLEEEEEEEEEEEEE
   Descartes: tttttLLLLRRRRRRRREEEEEEEEEEEEEtttLLLLLLLLLRRRRREEEEEEttttttttttLLLLLL
     Spinoza: ttttEEEEEEEEEEEEEttttttttttLLLRRRREEEEEEEEEEEEEttttttttttRRRREEEEEEtt
        Kant: ttttttttttLLLLLLLRREEEEEEEEEEEEEEEttttttttttLLLEEEEEEEEEEEEEEtttttttt
Schopenhauer: ttttttEEEEEEEEEEEEEttttttLLLLLLLLLEEEEEEEEEttttttttLLLLLLLLLLRRRRRRRR
   Nietzsche: tttttttttLLLLLLLLLLEEEEEEEEEEEEEttttttttLLLEEEEEEEEEttttttttRRRRRRRRE
Wittgenstein: ttttEEEEEEEEEEtttttLLLLLLLLLLLLLEEEEEEEEEttttttttttttRRRREEEEEEEEEEEt
   Heidegger: tttttttttttLLLEEEEEEEEEEEEEEtttttttLLLLLLREEEEEEEEEEEEEEEtttLLLLLLLLR
      Sartre: tttEEEEEEEEEttttLLLLLLLLLLLLRRRRREEEEEEEEEtttttttLLLLLLLLRRRRRRRRRRRR

Читать это следует следующим образом:


  • t обозначает, что философ "размышляет";
  • L обозначает, что философ ожидает захвата левой вилки (находится в состоянии wait_left);
  • R обозначает, что философ ожидает захвата правой вилки (находится в состоянии wait_right);
  • E обозначает, что философ "ест".

Мы можем видеть, что Сократ может взять вилку слева только после того, как Сартр отдаст ее. После чего Сократ будет ждать, пока Платон освободит правую вилку. Только после этого Сократ сможет поесть.


Простое решение без арбитра (официанта)


Если мы проанализируем результат работы решения Дейкстры, то увидим, что философы проводят много времени в ожидании захвата вилок. Что не есть хорошо, т.к. это время можно также потратить на раздумья. Не зря же бытует мнение, что если размышлять на голодный желудок, то можно получить гораздо более интересные и неожиданные результаты ;)


Давайте посмотрим на простейшее решение, в котором философ возвращает первую захваченную вилку, если он не может захватить вторую (в упомянутой выше статье "Modern dining philosophers" это решение реализует ForkLevelPhilosopherProtocol).


Исходный код этой реализации можно увидеть здесь, а код соответствующего актора-философа здесь.


Сообщения


В данном решении используется практически такой же набор сообщений:


struct take_t
{
   const so_5::mbox_t m_who;
   std::size_t m_philosopher_index;
};

struct busy_t : public so_5::signal_t {};

struct taken_t : public so_5::signal_t {};

struct put_t : public so_5::signal_t {};

Единственное отличие — это присутствие сигнала busy_t. Этот сигнал актор-вилка отсылает в ответ актору-философу если вилка уже захвачена другим философом.


Актор-вилка


Актор-вилка в этом решении оказывается даже проще, чем в решении Дейкстры:



class fork_t final : public so_5::agent_t
{
public :
   fork_t( context_t ctx ) : so_5::agent_t( ctx )
   {
      this >>= st_free;

      st_free.event( [this]( mhood_t<take_t> cmd )
            {
               this >>= st_taken;
               so_5::send< taken_t >( cmd->m_who );
            } );

      st_taken.event( []( mhood_t<take_t> cmd )
            {
               so_5::send< busy_t >( cmd->m_who );
            } )
         .just_switch_to< put_t >( st_free );
   }

private :
   const state_t st_free{ this };
   const state_t st_taken{ this };
};

Нам здесь даже не нужно хранить очередь ждущих философов.


Актор-философ


Актор-философ в этой реализации похож на оного из решения Дейкстры, но здесь актору-философу приходится обрабатывать еще и busy_t, поэтому диаграмма состояний выглядит следующим образом:


image


Аналогично, вся логика актора-философа определяется в so_define_agent():


void so_define_agent() override
{
   st_thinking
      .event< stop_thinking_t >( [=] {
         this >>= st_wait_left;
         so_5::send< take_t >( m_left_fork, so_direct_mbox(), m_index );
      } );

   st_wait_left
      .event< taken_t >( [=] {
         this >>= st_wait_right;
         so_5::send< take_t >( m_right_fork, so_direct_mbox(), m_index );
      } )
      .event< busy_t >( [=] {
         think( st_hungry_thinking );
      } );

   st_wait_right
      .event< taken_t >( [=] {
         this >>= st_eating;
      } )
      .event< busy_t >( [=] {
         so_5::send< put_t >( m_left_fork );
         think( st_hungry_thinking );
      } );

   st_eating
      .on_enter( [=] {
            so_5::send_delayed< stop_eating_t >( *this, eat_pause() );
         } )
      .event< stop_eating_t >( [=] {
         so_5::send< put_t >( m_right_fork );
         so_5::send< put_t >( m_left_fork );

         ++m_meals_eaten;
         if( m_meals_count == m_meals_eaten )
            this >>= st_done;
         else
            think( st_normal_thinking );
      } );

   st_done
      .on_enter( [=] {
         completion_watcher_t::done( so_environment(), m_index );
      } );
}

В общем-то, это практически такой же код, как и в решении Дейкстры, разве что добавилась пара обработчиков для busy_t.


Результаты


Результаты работы выглядят уже по-другому:


    Socrates: tttttttttL..R.....EEEEEEEEEEEEttttttttttR...L.L...EEEEEEEttEEEEEE
       Plato: ttttEEEEEEEEEEEttttttL.....L..EEEEEEEEEEEEEEEttttttttttL....L....
   Aristotle: ttttttttttttL..L.R..EEEEEEtttttttttttL..L....L....R.....EEEEEEEEE
   Descartes: ttttttttttEEEEEEEEttttttttttttEEEEEEEEttttEEEEEEEEEEEttttttL..L..
     Spinoza: ttttttttttL.....L...EEEEEEtttttttttL.L......L....L..L...R...R...E
        Kant: tttttttEEEEEEEttttttttL.L.....EEEEEEEEttttttttR...R..R..EEEEEtttt
Schopenhauer: tttR..R..L.....EEEEEEEttttttR.....L...EEEEEEEEEEEEEEEEttttttttttt
   Nietzsche: tttEEEEEEEEEEtttttttttEEEEEEEEEEEEEEEttttL....L...L..L....EEEEEEE
Wittgenstein: tttttL.L..L.....R.R.....L.....L....L...EEEEEEEEEEEEEEEtttttttttL.
   Heidegger: ttttR..R......EEEEEEEEEEEEEttttttttttR..L...L...L..L...EEEEtttttt
      Sartre: tttEEEEEEEtttttttL..L...L....R.EEEEEEEtttttEEEEtttttttR.....R..R.

Здесь мы видим новый символ, который означает, что актор-философ находится в "голодных раздумьях".


Даже на этом коротком фрагменте можно увидеть, что есть длительные моменты времени, на протяжении которых философ не может поесть. Это потому, что данное решение защищено от проблемы дедлоков, но не имеет защиты от голоданий.


Решение с официантом и очередью


Показанное выше простейшее решение без арбитра не защищает от голоданий. Упомянутая выше статья "Modern dining philosophers" содержит решение проблемы голоданий в виде протокола WaiterFair. Суть в том, что появляется арбитр (официант), к которому обращаются философы когда хотят поесть. А у официанта есть очередь заявок от философов. И вилки достаются философу только если обе вилки сейчас свободны, а в очереди еще нет ни одного из соседей того философа, который обратился к официанту.


Давайте посмотрим на то, как это же решение может выглядеть на акторах.


Исходный код этой реализации можно найти здесь.


Трюк


Проще всего было бы ввести новый набор сообщений, посредством которого философы могли бы общаться с официантом. Но я хотел сохранить не только уже существующий набор сообщений (т.е. take_t, taken_t, busy_t, put_t). Я так же хотел, чтобы использовался тот же самый актор-философ, что и в предыдущем решении. Поэтому мне нужно было решить хитрую задачку: как сделать так, чтобы актор-философ общался с единственным актором-официантом, но при этом думал, что он взаимодействует напрямую с акторами-вилками (коих уже нет на самом-то деле).


Эта задачка была решена с помощью нехитрого трюка: актор-официант создает набор mbox-ов, ссылки на которые отдаются акторам-философам как ссылки на mbox-ы акторов-вилок. При этом актор-официант подписывается на сообщения из всех этих mbox-ов (что в SObjectizer-е реализуется запросто, т.к. SObjectizer — это реализация не только/столько Модели Акторов, но еще и Pub/Sub поддерживается "из коробки").


В коде это выглядит приблизительно вот так:


class waiter_t final : public so_5::agent_t
{
public :
   waiter_t( context_t ctx, std::size_t forks_count )
      :  so_5::agent_t{ std::move(ctx) }
      ,  m_fork_states( forks_count, fork_state_t::free )
   {
      // Нужны mbox-ы для каждой "вилки"
      m_fork_mboxes.reserve( forks_count );
      for( std::size_t i{}; i != forks_count; ++i )
         m_fork_mboxes.push_back( so_environment().create_mbox() );
   }
   ...
   void so_define_agent() override
   {
      // Требуется создать подписки для каждой "вилки".
      for( std::size_t i{}; i != m_fork_mboxes.size(); ++i )
      {
         // Нам нужно знать индекс вилки. Поэтому используются лямбды.
         // Лямбда захватывает индекс и затем отдает этот индекс в
         // актуальный обработчик входящего сообщения.
         so_subscribe( fork_mbox( i ) )
            .event( [i, this]( mhood_t<take_t> cmd ) {
                  on_take_fork( std::move(cmd), i );
               } )
            .event( [i, this]( mhood_t<put_t> cmd ) {
                  on_put_fork( std::move(cmd), i );
               } );
      }
   }

private :
   ...
   // Почтовые ящики для несуществующих "вилок".
   std::vector< so_5::mbox_t > m_fork_mboxes;

Т.е. сперва создаем вектор mbox-ов для несуществующих "вилок", затем подписываемся на каждый из них. Да так подписываемся, чтобы знать к какой именно вилке относится запрос.


Реальным обработчиком входящего запроса take_t является метод on_take_fork():


void on_take_fork( mhood_t<take_t> cmd, std::size_t fork_index )
{
   // Используем тот факт, что у левой вилки индекс совпадает
   // с индексом самого философа.
   if( fork_index == cmd->m_philosopher_index )
      handle_take_left_fork( std::move(cmd), fork_index );
   else
      handle_take_right_fork( std::move(cmd), fork_index );
}

Кстати говоря, именно здесь нам и потребовалось второе поле из сообщения take_t.


Итак, в on_take_fork() мы имеем исходный запрос и индекс вилки, к которой запрос относится. Следовательно, мы можем определить, просит ли философ левую вилку или правую. И, соответственно, мы можем обрабатывать их по-разному (и нам приходится обрабатывать их по-разному).


Поскольку философ всегда сперва просит левую вилку, то все необходимые проверки нам нужно проделать именно в этот момент. И мы можем оказаться в одной из следующих ситуаций:


  1. Обе вилки свободны и могут быть отданы тому философу, который прислал запрос. В этом случае мы отсылаем taken_t философу, а правую вилку помечаем как зарезервированную, чтобы никто больше не мог ее взять.
  2. Вилки не могут быть отданы философу. Не важно почему. Может какая-то из них сейчас занята. Или в очереди есть кто-то из соседей философа. Как бы то ни было, мы помещаем философа, приславшего запрос, в очередь, после чего отсылаем ему busy_t.

Благодаря такой логике работы философ, получивший taken_t для левой вилки, может спокойно послать запрос take_t для правой вилки. Этот запрос будет сразу же удовлетворен, т.к. вилка уже зарезервирована для данного философа.


Результаты


Если запустить получившееся решение, то можно увидеть что-то вроде:


    Socrates: tttttttttttL....EEEEEEEEEEEEEEttttttttttL...L...EEEEEEEEEEEEEtttttL.
       Plato: tttttttttttL....L..L..L...L...EEEEEEEEEEEEEtttttL.....L....L.....EEE
   Aristotle: tttttttttL.....EEEEEEEEEttttttttttL.....L.....EEEEEEEEEEEtttL....L.L
   Descartes: ttEEEEEEEEEEtttttttL.L..EEEEEEEEEEEEtttL..L....L....L.....EEEEEEEEEE
     Spinoza: tttttttttL.....EEEEEEEEEttttttttttL.....L.....EEEEEEEEEEEtttL....L.L
        Kant: ttEEEEEEEEEEEEEtttttttL...L.....L.....EEEEEttttL....L...L..L...EEEEE
Schopenhauer: ttttL...L.....L.EEEEEEEEEEEEEEEEEtttttttttttL..L...L..EEEEEEEttttttt
   Nietzsche: tttttttttttL....L..L..L...L...L.....L....EEEEEEEEEEEEttL.....L...L..
Wittgenstein: tttttttttL....L...L....L....L...EEEEEEEttttL......L.....L.....EEEEEE
   Heidegger: ttttttL..L...L.....EEEEEEEEEEEEtttttL...L..L.....EEEEEEEEEEEttttttL.
      Sartre: ttEEEEEEEEEEEEEttttttttL.....L...EEEEEEEEEEEEttttttttttttL.....EEEEE

Можно обратить внимание на отсутствие символов R. Это потому, что не может возникнуть неудачи или ожидания на запросе правой вилки.


Еще одно решение с использованием арбитра (официанта)


В некоторых случаях предыдущее решение waiter_with_queue может показывать результаты, похожие вот на этот:


    Socrates: tttttEEEEEEEEEEEEEEtttL.....L.L....L....EEEEEEEEEttttttttttL....L.....EE
       Plato: tttttL..L..L....L.L....EEEEEEEEEEEEEEEttttttttttttL.....EEEEEEEEEttttttt
   Aristotle: tttttttttttL..L...L.....L.....L....L.....EEEEEEEEEEEEtttttttttttL....L..
   Descartes: ttttttttttEEEEEEEEEEttttttL.....L....L..L.....L.....L..L...L..EEEEEEEEtt
     Spinoza: tttttttttttL..L...L.....L.....L....L.....L..L..L....EEEEEEEEEEtttttttttt
        Kant: tttttttttL....L....L...L...L....L..L...EEEEEEEEEEEttttttttttL...L......E
Schopenhauer: ttttttL....L..L...L...L.L....L...EEEEEtttttL....L...L.....EEEEEEEEEttttt
   Nietzsche: tttttL..L..L....EEEEEEEEEEEEEttttttttttttEEEEEEEEEEEEEEEttttttttttttL...
Wittgenstein: tttEEEEEEEEEEEEtttL....L....L..EEEEEEEEEtttttL..L..L....EEEEEEEEEEEEEEEE
   Heidegger: tttttttttL...L..EEEEEEEEttttL..L.....L...EEEEEEEEEtttL.L..L...L....L...L
      Sartre: ttttttttttL..L....L...L.EEEEEEEEEEEtttttL...L..L....EEEEEEEEEEtttttttttt

Можно увидеть наличие достаточно длинных периодов времени, когда философы не могут поесть даже не смотря на наличие свободных вилок. Например, левая и правая вилки для Канта свободны на протяжении длительного времени, но Кант не может их взять, т.к. в очереди ожидания уже стоят его соседи. Которые ждут своих соседей. Которые ждут своих соседей и т.д.


Поэтому рассмотренная выше реализация waiter_with_queue защищает от голодания в том смысле, что рано или поздно философ поест. Это ему гарантировано. Но периоды голодания могут быть довольно долгими. И утилизация ресурсов может быть не оптимальной временами.


Дабы решить эту проблему я реализовал еще одно решение, waiter_with_timestamp (его код можно найти здесь). Вместо очереди там используется приоритизация запросов от философов с учетом времени их голодания. Чем дольше философ голодает, тем приоритетнее его запрос.


Рассматривать код этого решения мы не будем, т.к. по большому счету главное в нем — это тот же самый трюк с набором mbox-ов для несуществующих "вилок", который мы уже обсудили в разговоре про реализацию waiter_with_queue.


Несколько деталей реализации, на которые хотелось бы обратить внимание


Есть несколько деталей в реализациях на базе Акторов, на которые хотелось бы обратить внимание, т.к. эти детали демонстрируют интересные особенности SObjectizer-а.


Рабочий контекст для акторов


В рассмотренных реализациях все основные акторы (fork_t, philosopher_t, waiter_t) работали на контексте одной общей рабочей нити. Что вовсе не означает, что в SObjectizer-е все акторы работают только на одной единственной нити. В SObjectizer-е можно привязывать акторов к разным контекстам, что можно увидеть, например, в коде функции run_simulation() в решении no_waiter_simple.


Код run_simulation из no_waiter_simple
void run_simulation( so_5::environment_t & env, const names_holder_t & names )
{
   env.introduce_coop( [&]( so_5::coop_t & coop ) {
      coop.make_agent_with_binder< trace_maker_t >(
            so_5::disp::one_thread::create_private_disp( env )->binder(),
            names,
            random_pause_generator_t::trace_step() );

      coop.make_agent_with_binder< completion_watcher_t >(
            so_5::disp::one_thread::create_private_disp( env )->binder(),
            names );

      const auto count = names.size();

      std::vector< so_5::agent_t * > forks( count, nullptr );
      for( std::size_t i{}; i != count; ++i )
         forks[ i ] = coop.make_agent< fork_t >();

      for( std::size_t i{}; i != count; ++i )
         coop.make_agent< philosopher_t >(
               i,
               forks[ i ]->so_direct_mbox(),
               forks[ (i + 1) % count ]->so_direct_mbox(),
               default_meals_count );
   });
}

В этой функции создаются дополнительные акторы типов trace_maker_t и completion_watcher_t. Они будут работать на отдельных рабочих контекстах. Для этого создается два экземпляра диспетчера типа one_thread и акторы привязываются к этим экземплярам диспетчеров. Что означает, что данные акторы будут работать как активные объекты: каждый будет владеть собственной рабочей нитью.


SObjectizer предоставляет набор из нескольких разных диспетчеров, которые могут использоваться прямо "из коробки". При этом разработчик может создать в своем приложении столько экземпляров диспетчеров, сколько разработчику нужно.


Но самое важное то, что в самом акторе ничего не нужно менять, чтобы заставить его работать на другом диспетчере. Скажем, мы легко можем запустить акторов fork_t на одном пуле рабочих нитей, а акторов philosopher_t на другом пуле.


Код run_simulation из no_waiter_simple_tp
void run_simulation( so_5::environment_t & env, const names_holder_t & names )
{
   env.introduce_coop( [&]( so_5::coop_t & coop ) {
      coop.make_agent_with_binder< trace_maker_t >(
            so_5::disp::one_thread::create_private_disp( env )->binder(),
            names,
            random_pause_generator_t::trace_step() );

      coop.make_agent_with_binder< completion_watcher_t >(
            so_5::disp::one_thread::create_private_disp( env )->binder(),
            names );

      const auto count = names.size();

      // Параметры для настройки поведения thread_pool-диспетчера.
      so_5::disp::thread_pool::bind_params_t bind_params;
      bind_params.fifo( so_5::disp::thread_pool::fifo_t::individual );

      std::vector< so_5::agent_t * > forks( count, nullptr );
      // Создаем пул нитей для акторов-вилок.
      auto fork_disp = so_5::disp::thread_pool::create_private_disp(
               env,
               3u // Размер пула.
            );
      for( std::size_t i{}; i != count; ++i )
         // Каждая вилка будет привязана к пулу.
         forks[ i ] = coop.make_agent_with_binder< fork_t >(
               fork_disp->binder( bind_params ) );

      // Создаем пул нитей для акторов-философов.
      auto philosopher_disp = so_5::disp::thread_pool::create_private_disp(
               env,
               6u // Размер пула.
            );
      for( std::size_t i{}; i != count; ++i )
         coop.make_agent_with_binder< philosopher_t >(
               philosopher_disp->binder( bind_params ),
               i,
               forks[ i ]->so_direct_mbox(),
               forks[ (i + 1) % count ]->so_direct_mbox(),
               default_meals_count );
   });
}

И при этом нам не потребовалось поменять ни строчки в классах fork_t и philosopher_t.


Трассировка смены состояний акторов


Если посмотреть в реализацию философов в упомянутой выше статье Modern dining philosophers можно легко увидеть код, относящийся к трассировке действий философа, например:


void doEat() {
    eventLog_.startActivity(ActivityType::eat);
    wait(randBetween(10, 50));
    eventLog_.endActivity(ActivityType::eat);

В тоже время в показанных выше реализациях на базе SObjectizer подобного кода нет. Но трассировка, тем не менее, выполняется. За счет чего?


Дело в том, что в SObjectizer-е есть специальная штука: слушатель состояний агента. Такой слушатель реализуется как наследник класса agent_state_listener_t. Когда слушатель связывается с агентом, то SObjectizer автоматически уведомляет слушателя о каждом изменении состояния агента.


Установку слушателя можно увидеть в конструкторе агентов greedy_philosopher_t и philosopher_t:


philosopher_t(...)
   ...
   {
      so_add_destroyable_listener(
            state_watcher_t::make( so_environment(), index ) );
   }

Здесь state_watcher_t — это и есть нужная мне реализация слушателя.


Определение state_watcher_t

class state_watcher_t final : public so_5::agent_state_listener_t
{
   const so_5::mbox_t m_mbox;
   const std::size_t m_index;

   state_watcher_t( so_5::mbox_t mbox, std::size_t index );

public :
   static auto make( so_5::environment_t & env, std::size_t index )
   {
      return so_5::agent_state_listener_unique_ptr_t{
            new state_watcher_t{ trace_maker_t::make_mbox(env), index }
      };
   }

   void changed( so_5::agent_t &, const so_5::state_t & state ) override;
};

Когда экземпляр state_watcher_t связан с агентом SObjectizer вызывает метод changed() при смене состояния агента. И уже внутри state_watcher_t::changed инициируются действия для трассировки действий актора-философа.


Фрагмент реализации state_watcher_t::changed
void state_watcher_t::changed( so_5::agent_t &, const so_5::state_t & state ) 
{
   const auto detect_label = []( const std::string & name ) {...};

   const char state_label = detect_label( state.query_name() ); 
   if( '?' == state_label )
      return;

   so_5::send< trace::state_changed_t >( m_mbox, m_index, state_label );
}

Решения на базе CSP


Теперь мы поговорим о реализациях, которые не используют акторов вообще. Посмотрим на те же самые решения (no_waiter_dijkstra, no_waiter_simple, waiter_with_timestamps) при реализации которых применяются std::thread и SObjectizer-овские mchain-ы (которые, по сути, есть CSP-шные каналы). Причем, подчеркну особо, в CSP-шных решениях используется тот же самый набор сообщений (все те же take_t, taken_t, busy_t, put_t).


В CSP-подходе вместо "процессов" используются нити ОС. Поэтому каждая вилка, каждый философ и каждый официант реализуется отдельным объектом std::thread.


Решение Дейкстры


Исходный код этого решения можно увидеть здесь.


Нить для вилки


Нить для вилки в решении Дейкстры работает очень просто: цикл чтения сообщений из входного канала + обработка сообщений типа take_t и put_t. Что реализуется функцией fork_process следующего вида:


void fork_process(
   so_5::mchain_t fork_ch )
{
   // Состояние вилки: занята или нет.
   bool taken = false;

   // Очередь ждущих философов.
   std::queue< so_5::mbox_t > wait_queue;

   // Читаем и обрабатываем сообщения пока канал не закроют.
   so_5::receive( so_5::from( fork_ch ),
         [&]( so_5::mhood_t<take_t> cmd ) {
            if( taken )
               // Вилка занята, философ должен ждать в очереди.
               wait_queue.push( cmd->m_who );
            else
            {
               // Философ может взять вилку.
               taken = true;
               so_5::send< taken_t >( cmd->m_who );
            }
         },
         [&]( so_5::mhood_t<put_t> ) {
            if( wait_queue.empty() )
               taken = false; // Вилка больше никому не нужна.
            else
            {
               // Вилку нужно отдать первому философу из очереди.
               const auto who = wait_queue.front();
               wait_queue.pop();
               so_5::send< taken_t >( who );
            }
         } );
}

У функции fork_process всего один аргумент: входной канал, который создается где-то в другом месте.


Самое интересное в fork_process — это "цикл" выборки сообщений из канала до тех пор, пока канал не будет закрыт. Этот цикл реализуется всего одним вызовом функции receive():


so_5::receive( so_5::from( fork_ch ),
      [&]( so_5::mhood_t<take_t> cmd ) {...},
      [&]( so_5::mhood_t<put_t> ) {...} );

В SObjectizer-е есть несколько версий функции receive() и здесь мы видим одну из них. Эта версия читает сообщения из канала пока канал не будет закрыт. Для каждого прочитанного сообщения ищется обработчик. Если обработчик найден, то он вызывается и сообщение обрабатывается. Если не найден, то сообщение просто выбрасывается.


Обработчики сообщений задаются в виде лямбда-функций. Эти лямбды выглядят как близнецы братья соответствующих обработчиков в акторе fork_t из решения на базе Акторов. Что, в принципе, не удивительно.


Нить для философа


Логика работы философа реализована в функции philosopher_process. У этой функции достаточно объемный код, поэтому мы будем разбираться с ним по частям.


Полный код philosopher_process
oid philosopher_process(
   trace_maker_t & tracer,
   so_5::mchain_t control_ch,
   std::size_t philosopher_index,
   so_5::mbox_t left_fork,
   so_5::mbox_t right_fork,
   int meals_count )
{
   int meals_eaten{ 0 };

   random_pause_generator_t pause_generator;

   // Этот канал будет использован для получения ответов от вилок.
   auto self_ch = so_5::create_mchain( control_ch->environment() );

   while( meals_eaten < meals_count )
   {
      tracer.thinking_started( philosopher_index, thinking_type_t::normal );

      // Имитируем размышления приостанавливая нить.
      std::this_thread::sleep_for(
            pause_generator.think_pause( thinking_type_t::normal ) );

      // Пытаемся взять левую вилку.
      tracer.take_left_attempt( philosopher_index );
      so_5::send< take_t >( left_fork, self_ch->as_mbox(), philosopher_index );

      // Запрос отправлен, ждем ответа.
      so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
         [&]( so_5::mhood_t<taken_t> ) {
            // Взяли левую вилку, пытаемся взять правую.
            tracer.take_right_attempt( philosopher_index );
            so_5::send< take_t >(
                  right_fork, self_ch->as_mbox(), philosopher_index );

            // Запрос отправлен, ждем ответа.
            so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
               [&]( so_5::mhood_t<taken_t> ) {
                  // У нас обе вилки. Можно поесть.
                  tracer.eating_started( philosopher_index );

                  // Имитируем поглощение пищи приостанавливая нить.
                  std::this_thread::sleep_for( pause_generator.eat_pause() );

                  // На шаг ближе к финалу.
                  ++meals_eaten;

                  // После еды возвращаем правую вилку.
                  so_5::send< put_t >( right_fork );
               } );

            // А следом возвращаем и левую.
            so_5::send< put_t >( left_fork );
         } );

   }

   // Сообщаем о том, что мы закончили.
   tracer.philosopher_done( philosopher_index );
   so_5::send< philosopher_done_t >( control_ch, philosopher_index );
}

Давайте начнем с прототипа функции:


void philosopher_process(
   trace_maker_t & tracer,
   so_5::mchain_t control_ch,
   std::size_t philosopher_index,
   so_5::mbox_t left_fork,
   so_5::mbox_t right_fork,
   int meals_count )

Смысл и назначение некоторых из этих параметров придется пояснить.


Поскольку мы не используем SObjectizer-овских агентов, то у нас нет возможности снимать след работы философа через слушателя состояний агента, как это делалось в варианте на Actor-ах. Поэтому в коде философа приходится делать вот такие вставки:


tracer.thinking_started( philosopher_index, thinking_type_t::normal );

И аргумент tracer как раз является ссылкой на объект, который занимается трассировкой работы философов.


Аргумент control_ch задает канал, в который должно быть записано сообщение philosopher_done_t после того, как философ съест все, что ему положено. Этот канал затем будет использоваться для определения момента завершения работы всех философов.


Аргументы left_fork и right_fork задают каналы для взаимодействия с вилками. Именно в эти каналы будут отсылаться сообщения take_t и put_t. Но если это каналы, то почему используется тип mbox_t вместо mchain_t?


Это хороший вопрос! Но ответ на него мы увидим ниже, при обсуждении другого решения. Пока же можно сказать, что mchain — это что-то вроде разновидности mbox-а, поэтому ссылки на mchain-ы можно передавать через объекты mbox_t.


Далее определяется ряд переменных, которые формируют состояние философа:


int meals_eaten{ 0 };

random_pause_generator_t pause_generator;

auto self_ch = so_5::create_mchain( control_ch->environment() );

Наверное наиболее важная переменная — это self_ch. Это персональный канал философа, через который философ будет получать ответные сообщения от вилок.


Ну а теперь мы можем перейти к основной логике работы философа. Т.е. к циклу повторения таких операций как размышления, захват вилок и поглощение пищи.


Можно отметить, что в отличии от решения на базе Акторов, для имитации длительных операций здесь используется this_thread::sleep_for вместо отложенных сообщений.


Попытка взять вилку выглядит практически так же, как и в случае с Акторами:


so_5::send< take_t >( left_fork, self_ch->as_mbox(), philosopher_index );

Здесь используется все тот же тип take_t. Но в нем есть поле типа mbox_t, тогда как self_ch имеет тип mchain_t. Поэтому приходится преобразовывать ссылку на канал в ссылку на почтовый ящик через вызов as_mbox().


Далее можно увидеть вызов receive():


so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
      [&]( so_5::mhood_t<taken_t> ) {...} );

Этот вызов возвращает управление только когда один экземпляр taken_t будет извлечен и обработан. Ну или если канал будет закрыт. В общем, мы здесь ждем поступление нужного нам ответа от вилки.


В общем-то это практически все, что можно было бы рассказать про philosopher_process. Хотя стоит заострить внимание на вложенном вызове receive() для одного и того же канала:


so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
      [&]( so_5::mhood_t<taken_t> ) {
         ...
         so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
               [&]( so_5::mhood_t<taken_t> ) {...} );
         ...
      } );

Эта вложенность позволяет записать логику работы философа в простом и более-менее компактном виде.


Функция запуска симуляции


При обсуждении решений на базе Акторов мы не останавливались на разборе функций run_simulation(), поскольку там ничего особо интересного или важного не было. Но вот в случае с CSP-подходом на run_simulation() имеет смысл остановиться. В том числе для того, чтобы в очередной раз убедиться в том, насколько непросто писать корректный многопоточный код (кстати говоря вне зависимости от языка программирования).


Полный код функции run_simulation
void run_simulation(
   so_5::environment_t & env,
   const names_holder_t & names ) noexcept
{
   const auto table_size = names.size();
   const auto join_all = []( std::vector<std::thread> & threads ) {
      for( auto & t : threads )
         t.join();
   };

   trace_maker_t tracer{
         env,
         names,
         random_pause_generator_t::trace_step() };

   // Создаем вилки.
   std::vector< so_5::mchain_t > fork_chains;
   std::vector< std::thread > fork_threads( table_size );

   for( std::size_t i{}; i != table_size; ++i )
   {
      // Персональный канал для очередной вилки.
      fork_chains.emplace_back( so_5::create_mchain(env) );
      // Рабочая нить для очередной вилки.
      fork_threads[ i ] = std::thread{ fork_process, fork_chains.back() };
   }

   // Канал для получения уведомлений от философов.
   auto control_ch = so_5::create_mchain( env );

   // Создаем философов.
   const auto philosopher_maker =
         [&](auto index, auto left_fork_idx, auto right_fork_idx) {
            return std::thread{
                  philosopher_process,
                  std::ref(tracer),
                  control_ch,
                  index,
                  fork_chains[ left_fork_idx ]->as_mbox(),
                  fork_chains[ right_fork_idx ]->as_mbox(),
                  default_meals_count };
         };
   std::vector< std::thread > philosopher_threads( table_size );
   for( std::size_t i{}; i != table_size - 1u; ++i )
   {
      // Запускаем очередного философа на отдельной нити.
      philosopher_threads[ i ] = philosopher_maker( i, i, i+1u );
   }
   // Последний философ должен захватывать вилки в обратном порядке.
   philosopher_threads[ table_size - 1u ] = philosopher_maker(
         table_size - 1u,
         table_size - 1u,
         0u );

   // Ждем пока все философы закончат.
   so_5::receive( so_5::from( control_ch ).handle_n( table_size ),
         [&names]( so_5::mhood_t<philosopher_done_t> cmd ) {
            fmt::print( "{}: done\n", names[ cmd->m_philosopher_index ] );
         } );

   // Дожидаемся завершения рабочих нитей философов.
   join_all( philosopher_threads );

   // Принудительно закрываем каналы для вилок.
   for( auto & ch : fork_chains )
      so_5::close_drop_content( ch );

   // После чего дожидаемся завершения рабочих нитей вилок.
   join_all( fork_threads );

   // Показываем результат.
   tracer.done();

   // И останавливаем SObjectizer.
   env.stop();
}

В принципе, в основном код run_simulation() должен быть более-менее понятен. Поэтому я разберу только некоторые моменты.


Нам требуются рабочие нити для вилок. Этот фрагмент как раз отвечает за их создание:


std::vector< so_5::mchain_t > fork_chains;
std::vector< std::thread > fork_threads( table_size );

for( std::size_t i{}; i != table_size; ++i )
{
   fork_chains.emplace_back( so_5::create_mchain(env) );
   fork_threads[ i ] = std::thread{ fork_process, fork_chains.back() };
}

При этом нам нужно сохранять и каналы, созданные для вилок, и сами рабочие нити. Каналы потребуются ниже для передачи ссылок на них философам. А рабочие нити потребуются для того, чтобы затем вызвать join для них.


После чего мы создаем рабочие нити для философов и так же собираем рабочие нити в контейнер, т.к. нам нужно будет затем вызывать join:


std::vector< std::thread > philosopher_threads( table_size );
for( std::size_t i{}; i != table_size - 1u; ++i )
{
   philosopher_threads[ i ] = philosopher_maker( i, i, i+1u );
}
philosopher_threads[ table_size - 1u ] = philosopher_maker(
      table_size - 1u,
      table_size - 1u,
      0u );

Далее мы должны дать философам некоторое время для выполнения симуляции. Дожидаемся момента завершения симуляции с помощью этого фрагмента:


so_5::receive( so_5::from( control_ch ).handle_n( table_size ),
      [&names]( so_5::mhood_t<philosopher_done_t> cmd ) {
         fmt::print( "{}: done\n", names[ cmd->m_philosopher_index ] );
      } );

Этот вариант receive() возвращает управление только после получения table_size сообщений типа philosopher_done_t.


Ну а после получения всех philosopher_done_t остается выполнить очистку ресурсов.


Делаем join для всех рабочих нитей философов:


join_all( philosopher_threads );

После чего нужно сделать join для всех нитей вилок. Но просто вызывать join нельзя, т.к. тогда мы тупо зависнем. Ведь рабочие нити вилок спят внутри вызовов receive() и никто их не разбудит. Поэтому нам нужно сперва закрыть все каналы для вилок и лишь затем вызывать join:


for( auto & ch : fork_chains )
   so_5::close_drop_content( ch );

join_all( fork_threads );

Вот теперь главные операции по очистке ресурсов можно считать законченными.


Несколько слов о noexcept


Надеюсь, что код run_simulation сейчас полностью понятен и я могу попробовать объяснить, почему эта функция помечена как noexcept. Дело в том, что в ней exception-safety не обеспечивается от слова совсем. Поэтому самая лучшая реакция на возникновение исключения в такой тривиальной реализации — это принудительное завершение всего приложения.


Но почему run_simulation не обеспечивает безопасность по отношению к исключениям?


Давайте представим, что мы создаем рабочие нити для вилок и при создании очередной нити у нас выскакивает исключение. Дабы обеспечить хоть какую-нибудь exception-safety нам нужно принудительно завершить те нити, которые уже были запущены. И кто-то может подумать, что для этого достаточно переписать код запуска нитей для вилок следующим образом:


try
{
   for( std::size_t i{}; i != table_size; ++i )
   {
      fork_chains.emplace_back( so_5::create_mchain(env) );
      fork_threads[ i ] = std::thread{ fork_process, fork_chains.back() };
   }
}
catch( ... )
{
   for( std::size_t i{}; i != fork_chains.size(); ++i )
   {
      so_5::close_drop_content( fork_chains[ i ] );
      if( fork_threads[ i ].joinable() )
         fork_threads[ i ].join();
   }
   throw;
}

К сожалению, это очевидное и неправильное решение. Т.к. оно окажется бесполезным если исключение возникнет уже после того, как мы создадим все нити для вилок. Поэтому лучше использовать что-то вроде вот такого:


struct fork_threads_stuff_t {
   std::vector< so_5::mchain_t > m_fork_chains;
   std::vector< std::thread > m_fork_threads;

   fork_threads_stuff_t( std::size_t table_size )
      :  m_fork_threads( table_size )
   {}
   ~fork_threads_stuff_t()
   {
      for( std::size_t i{}; i != m_fork_chains.size(); ++i )
      {
         so_5::close_drop_content( m_fork_chains[ i ] );
         if( m_fork_threads[ i ].joinable() )
            m_fork_threads[ i ].join();
      }
   }

   void run()
   {
      for( std::size_t i{}; i != m_fork_threads.size(); ++i )
      {
         m_fork_chains.emplace_back( so_5::create_mchain(env) );
         m_fork_threads[ i ] = std::thread{ fork_process, m_fork_chains.back() };
      }
   }
} fork_threads_stuff{ table_size }; // Преаллоцируем нужные ресурсы.
fork_threads_stuff.run(); // Создаем каналы и запускаем нити.
   // Вся нужная очистка произойдет в деструкторе fork_threads_stuff.

Ну или можно воспользоваться трюками, которые позволяют выполнять нужный нам код при выходе из скоупа (например, Boost-овским ScopeExit-ом, GSL-овским finally() и им подобным).


Аналогичная проблема существует и с запуском нитей для философов. И решать ее нужно будет подобным образом.


Однако, если поместить весь необходимый код по обеспечению exception-safety в run_simulation(), то код run_simulation() окажется и объемнее, и сложнее в восприятии. Что не есть хорошо для функции, написанной исключительно в демонстрационных целях и не претендующей на продакшен-качество. Поэтому я решил забить на обеспечение exception-safety внутри run_simulation() и пометил функцию как noexcept, что приведет к вызову std::terminate в случае возникновения исключения. ИМХО, для такого рода демонстрационных примеров это вполне себе разумный вариант.


Тем не менее, проблема в том, что нужно иметь соответствующий опыт чтобы понять, какой код по очистке ресурсов будет работать, а какой нет. Неопытный программист может посчитать, что достаточно только вызвать join, хотя на самом деле нужно еще предварительно закрыть каналы перед вызовом join. И такую проблему в коде будет очень непросто выловить.


Простое решение без использования арбитра (официанта)


Теперь мы можем рассмотреть, как в CSP-подходе будет выглядеть простое решение с возвратом вилок при неудачном захвате, но без арбитра.


Исходный код этого решения можно найти здесь.


Нить для вилки


Нить для вилки реализуется функцией fork_process, которая выглядит следующим образом:


void fork_process(
   so_5::mchain_t fork_ch )
{
   // Состояние вилки: захвачена или свободна.
   bool taken = false;

   // Обрабатываем сообщения пока канал не закроют.
   so_5::receive( so_5::from( fork_ch ),
         [&]( so_5::mhood_t<take_t> cmd ) {
            if( taken )
               so_5::send< busy_t >( cmd->m_who );
            else
            {
               taken = true;
               so_5::send< taken_t >( cmd->m_who );
            }
         },
         [&]( so_5::mhood_t<put_t> ) {
            if( taken )
               taken = false;
         } );
}

Можно увидеть, что эта fork_process проще, чем аналогичная в решении Дейкстры (ту же самую картину мы могли наблюдать, когда рассматривали решения на базе Акторов).


Нить для философа


Нить для философа реализуется функцией philosopher_process, которая оказывается несколько сложнее, чем ее аналог в решении Дейкстры.


Полный код philosopher_process
void philosopher_process(
   trace_maker_t & tracer,
   so_5::mchain_t control_ch,
   std::size_t philosopher_index,
   so_5::mbox_t left_fork,
   so_5::mbox_t right_fork,
   int meals_count )
{
   int meals_eaten{ 0 };

   // Этот флаг потребуется для трассировки действий философа.
   thinking_type_t thinking_type{ thinking_type_t::normal };

   random_pause_generator_t pause_generator;

   // Канал для получения ответов от вилок.
   auto self_ch = so_5::create_mchain( control_ch->environment() );

   while( meals_eaten < meals_count )
   {
      tracer.thinking_started( philosopher_index, thinking_type );

      // Имитируем размышления приостанавливая нить.
      std::this_thread::sleep_for( pause_generator.think_pause( thinking_type ) );

      // На случай, если захватить вилки не получится.
      thinking_type = thinking_type_t::hungry;

      // Пытаемся взять левую вилку.
      tracer.take_left_attempt( philosopher_index );
      so_5::send< take_t >( left_fork, self_ch->as_mbox(), philosopher_index );

      // Запрос отправлен, ждем ответа.
      so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
         []( so_5::mhood_t<busy_t> ) { /* ничего не нужно делать */ },
         [&]( so_5::mhood_t<taken_t> ) {
            // Левая вилка взята, попробуем взять правую.
            tracer.take_right_attempt( philosopher_index );
            so_5::send< take_t >(
                  right_fork, self_ch->as_mbox(), philosopher_index );

            // Запрос отправлен, ждем ответа.
            so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
               []( so_5::mhood_t<busy_t> ) { /* ничего не нужно делать */ },
               [&]( so_5::mhood_t<taken_t> ) {
                  // У нас обе вилки, можно поесть.
                  tracer.eating_started( philosopher_index );

                  // Имитируем поглощение пищи приостанавливая нить.
                  std::this_thread::sleep_for( pause_generator.eat_pause() );

                  // На шаг ближе к финалу.
                  ++meals_eaten;

                  // После еды нужно вернуть правую вилку.
                  so_5::send< put_t >( right_fork );

                  // Следующий период размышлений должен быть помечен как "normal".
                  thinking_type = thinking_type_t::normal;
               } );

            // Левую вилку нужно вернуть в любом случае.
            so_5::send< put_t >( left_fork );
         } );

   }

   // Уведомляем о завершении работы.
   tracer.philosopher_done( philosopher_index );
   so_5::send< philosopher_done_t >( control_ch, philosopher_index );
}

В общем-то код philosopher_process очень похож на код philosopher_process из решения Дейкстры. Но есть два важных отличия.


Во-первых, это переменная thinking_type. Она нужна для того, чтобы формировать правильный след работы философа, а так же для того, чтобы вычислять паузы при имитации "раздумий" философа.


Во-вторых, это обработчики для сообщений busy_t. Мы их можем увидеть при вызове receive():


so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
   []( so_5::mhood_t<busy_t> ) { /* ничего не нужно делать */ },
   [&]( so_5::mhood_t<taken_t> ) {
      ...
      so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
         []( so_5::mhood_t<busy_t> ) { /* ничего не нужно делать */ },
         [&]( so_5::mhood_t<taken_t> ) {...} );

Да, обработчики для busy_t пусты, но это потому, что все необходимые действия либо уже были сделаны перед вызовом receive(), либо будут сделаны после выхода из receive(). Поэтому при получении busy_t ничего не нужно делать. Но сами обработчики должны быть определены, т.к. их присутствие запрещает receive() выбрасывать экземпляры busy_t без обработки. Благодаря присутствию таких обработчиков receive() возвращает управление когда в канал приходит сообщение busy_t.


Решение с официантом и временными отметками


На базе CSP-подхода было сделано еще одно решение, которое я бы хотел здесь кратко осветить. Разбирая решения на базе Акторов речь шла о решениях с арбитром (официантом): мы рассматривали решение waiter_with_queue, в котором официант использует очередь заявок, а так же упоминалось решение waiter_with_timestamps. Оба эти решения использовали один и тот же трюк: официант создавал набор mbox-ов для несуществующих вилок, эти mbox-ы раздавались философам, но сообщения из mbox-ов обрабатывались официантом.


Похожий трюк нужен и в CSP-подходе для того, чтобы я смог переиспользовать уже существующую реализацию philosopher_process из решения no_waiter_simple. Но может ли официант создать набор mchain-ов которые будут использоваться философами и из которых официант будет читать сообщения, адресованные вилкам?


К сожалению, нет.


Создать набор mchain-ов не проблема. Проблема в том, чтобы читать сообщения сразу из нескольких mchain-ов.


В SObjectizer-е есть функция select(), которая позволяет это делать, например, она позволяет написать так:


so_5::select( so_5::from_all(),
   case_(ch1, one_handler_1, one_handler_2, one_handler_3, ...),
   case_(ch2, two_handler_1, two_handler_2, two_handler_3, ...),
   ...);

Но для select() нужно, чтобы список каналов и обработчиков сообщений из них был доступен в компайл-тайм. Тогда как в моих решениях задачи "обедающих философов" этот список становится известен только во время исполнения. Поэтому в CSP-подходе нельзя в чистом виде переиспользовать трюк из подхода на базе Акторов.


Но мы можем его переосмыслить.


Итак, суть в том, что в исходных сообщениях take_t и put_t нет поля для хранения индекса вилки. И поэтому нам нужно как-то эти индексы получить. Раз уж мы не можем запихнуть индексы внутрь take_t и put_t, то давайте сделаем расширенные версии этих сообщений:


struct extended_take_t final : public so_5::message_t
{
   const so_5::mbox_t m_who;
   const std::size_t m_philosopher_index;
   const std::size_t m_fork_index;

   extended_take_t(
      so_5::mbox_t who,
      std::size_t philosopher_index,
      std::size_t fork_index )
      :  m_who{ std::move(who) }
      ,  m_philosopher_index{ philosopher_index }
      ,  m_fork_index{ fork_index }
   {}
};

struct extended_put_t final : public so_5::message_t
{
   const std::size_t m_fork_index;

   extended_put_t(
      std::size_t fork_index )
      :  m_fork_index{ fork_index }
   {}
};

Вообще говоря, нет надобности наследовать типы сообщений от so_5::message_t, хотя я обычно как раз использую такое наследование (у него есть некоторые небольшие бенефиты). В данном же случае наследование используется просто для того, чтобы продемонстрировать такой способ определения SObjectizer-овских сообщений.

Теперь нужно сделать так, чтобы официант читал именно расширенные версии сообщений вместо оригинальных. Значит нам нужно научиться перехватывать сообщения take_t и put_t, преобразовывать их в extended_take_t и extended_put_t, отсылать новые сообщения официанту.


Для этого нам потребуется собственный mbox. Всего лишь :)


Код собственного mbox-а
class wrapping_mbox_t final : public so_5::extra::mboxes::proxy::simple_t
{
   using base_type_t = so_5::extra::mboxes::proxy::simple_t;

   // Куда сообщения должны доставляться.
   const so_5::mbox_t m_target;
   // Индекс вилки, который должен использоваться в новых сообщениях.
   const std::size_t m_fork_index;

   // Типы сообщений для перехвата.
   static std::type_index original_take_type;
   static std::type_index original_put_type;

public :
   wrapping_mbox_t(
      const so_5::mbox_t & target,
      std::size_t fork_index )
      :  base_type_t{ target }
      ,  m_target{ target }
      ,  m_fork_index{ fork_index }
   {}

   // Это основной метод so_5::abstract_message_box_t для доставки сообщений.
   // Переопределяем его для перехвата и преобразования сообщений.
   void do_deliver_message(
      const std::type_index & msg_type,
      const so_5::message_ref_t & message,
      unsigned int overlimit_reaction_deep ) const override
   {
      if( original_take_type == msg_type )
      {
         // Получаем доступ к исходному сообщению.
         const auto & original_msg = so_5::message_payload_type<::take_t>::
               payload_reference( *message );
         // Шлем новое сообщение вместо старого.
         so_5::send< extended_take_t >(
               m_target,
               original_msg.m_who,
               original_msg.m_philosopher_index,
               m_fork_index );
      }
      else if( original_put_type == msg_type )
      {
         // Шлем новое сообщение вместо старого.
         so_5::send< extended_put_t >( m_target, m_fork_index );
      }
      else
         base_type_t::do_deliver_message(
               msg_type,
               message,
               overlimit_reaction_deep );
   }

   // Фабричный метод чтобы было проще создавать wrapping_mbox_t.
   static auto make(
      const so_5::mbox_t & target,
      std::size_t fork_index )
   {
      return so_5::mbox_t{ new wrapping_mbox_t{ target, fork_index } };
   }
};

std::type_index wrapping_mbox_t::original_take_type{ typeid(::take_t) };
std::type_index wrapping_mbox_t::original_put_type{ typeid(::put_t) };

Здесь использовался самый простой способ создания собственного mbox-а: в сопутствующем проекте so_5_extra есть заготовка, которую можно переиспользовать и сохранить себе кучу времени. Без использования этой заготовки мне пришлось бы наследоваться напрямую от so_5::abstract_message_box_t и реализовывать ряд чистых виртуальных методов.


Как бы то ни было, теперь есть класс wrapping_mbox_t. Так что мы теперь можем создать набор экземпляров этого класса, ссылки на которые и будут раздаваться философам. Философы будут отсылать сообщения в wrapping_mbox, а эти сообщения будут преобразовываться и переадресовываться в единственный mchain официанта. Поэтому функция waiter_process, которая является главной функцией нити официанта, будет иметь вот такой простой вид:


void waiter_process(
   so_5::mchain_t waiter_ch,
   details::waiter_logic_t & logic )
{
   // Получаем и обрабатываем сообщения пока канал не закроют.
   so_5::receive( so_5::from( waiter_ch ),
         [&]( so_5::mhood_t<details::extended_take_t> cmd ) {
            logic.on_take_fork( std::move(cmd) );
         },
         [&]( so_5::mhood_t<details::extended_put_t> cmd ) {
            logic.on_put_fork( std::move(cmd) );
         } );
}

Конечно же, прикладная логика официанта реализована в другом месте и ее код не так прост и короток, но мы не будем туда погружаться. Интересующиеся могут посмотреть код решения waiter_with_timestamps здесь.


Вот сейчас мы можем ответить на вопрос: "Почему каналы для вилок передаются в philosopher_process как mbox-ы?" Это потому, что для решения waiter_with_timestamps был реализован собственный mbox, а не mchain.


Конечно же, можно было бы создать и собственный mchain. Но это потребовало бы несколько больше работы, т.к. в so_5_extra пока нет такой же заготовки для собственных mchain-ов (может быть со временем появится). Так что для экономии времени я просто остановился на mbox-ах вместо mchain-ов.


Заключение


Вот, пожалуй, и все, что хотелось рассказать про реализованные на базе Акторов и CSP решения. Свою задачу я видел не в том, чтобы сделать максимально эффективные решения. А в том, чтобы показать, как именно они могут выглядеть. Надеюсь, что кому-то это было интересно.


Позволю себе отвлечь внимание тех, кто интересуется SObjectizer-ом. Все идет к тому, что в ближайшее время начнется работа над следующей "большой" версией SObjectizer — веткой 5.6, нарушающей совместимость с веткой 5.5. Желающие сказать свое веское слово по этому поводу, могут сделать это здесь (или здесь). Более-менее актуальный список того, что поменяется в SO-5.6 можно найти здесь (туда же можно добавить и свои пожелания).


На этом у меня все, большое спасибо всем читателям за потраченное на данную статью время!


PS. Слово "современные" в заголовке взято в кавычки потому, что в самих решениях нет ничего современного. Разве что за исключением использования кода на C++14.

Теги:
Хабы:
+24
Комментарии54

Публикации

Изменить настройки темы

Истории

Работа

QT разработчик
6 вакансий
Программист C++
123 вакансии

Ближайшие события