Асинхронный многопоточный пул воркеров на Perl

image

В работе веб-сервиса, да и вообще многих других систем, часто встречается необходимость выполнения различных фоновых задач. Для этого пишут скрипты — воркеры — которые берут список имеющихся задач и начинают их выполнять — с какой-то скоростью и в какой-то последовательности.

Понятное дело, хорошо, когда все задачи выполняются быстро и без проволочек.

Для ускорения выполнения задач желательно решить две проблемы:

  • Научить воркер не ждать выполнения каждого отдельного этапа задачи (асинхронность)
  • Научить воркер выполнять одновременно несколько задач (многопоточность) (disclaimer: на самом деле термин «многопоточность» тут используется в значении «многопроцессность»)

В этой статье мы рассмотрим вариант реализации воркера, который будет одновременно асинхронным и многопоточным.

Модуль AnyEvent


Для программирования в асинхронном режиме в Перле есть отличный модуль AnyEvent.

На всякий случай следует сказать, что на самом деле AnyEvent является оберткой над другими низкоуровневыми асинхронными модулями. Как DBI является оберткой и универсальным интерфейсом к разным базам данных, так и AnyEvent является оберткой и универсальным интерфейсом к различным реализациям асинхронных движков.

Для AnyEvent имеется огромное количество всевозможных расширений, в том числе есть и расширение для написания многопоточных приложений — модуль AnyEvent::Fork::Pool.

Модуль AnyEvent::Fork::Pool предоставляет простой способ создания пула воркеров, которые будут обрабатывать задачи в асинхронном многопоточном режиме.

Скрипт


Рассмотрим скрипт anyevent_pool.pl:

#!/usr/bin/perl

use strict;
use warnings;

use AnyEvent::Fork::Pool;

# Модуль воркера
my $mod = 'Worker';
# Функция воркера
my $sub = 'work';

# Определить количество ядер в системе
my $cpus = AnyEvent::Fork::Pool::ncpu 1;

# Создать пул воркеров
my $pool = AnyEvent::Fork
  ->new
  ->require ($mod)
  ->AnyEvent::Fork::Pool::run(
      "${mod}::$sub",         # Модуль::Функция - рабочая функция воркера
      init => "${mod}::init", # Модуль::init - функция инициализации воркера
      max  => $cpus,          # Количество воркеров в пуле
      idle => 0,              # Количество воркеров при простое
      load => 1,              # Размер очереди воркера
  );

# Поставить пулу задачи
for my $str (qw{q2 rtr4 ui3 asdg5}) {
  $pool->($str, sub {
      print "result: @_\n";
  });
};

AnyEvent->condvar->recv;

Несмотря на небольшой объем, этот скрипт представляет собой полноценное асинхронное многопоточное приложение.

Разберем его по частям.

Переменные


# Модуль воркера
my $mod = 'Worker';
# Функция воркера
my $sub = 'work';

Эти переменные задают связку между пулом и тем кодом, который будет выполять конкретные фоновые задачи. Пул — он один на всех, а задачи могут быть разные. Эти переменные указывают пулу, какой именно код (какую функцию из какого модуля) вы хотите запустить для выполнения конкретной задачи.

Например, у вас может быть модуль Text для обработки текста, а в модуле функции length и trim. И еще у вас может быть модуль Image, в котором могут быть функции resize и crop. Пулу совершенно без разницы, что делают ваши функции и как они устроены. Вам нужно просто сказать пулу, в каком модуле они находятся и как они называются, и пул их выполнит.

Важно! Модуль воркера не нужно подключать в скрипте через «use Worker». Пул сам автоматически подгрузит модуль воркера, вам нужно только правильно указать название модуля в переменной.

Количество ядер


# Определить количество ядер в системе
my $cpus = AnyEvent::Fork::Pool::ncpu 1;

Для многопоточного выполнения задач желательно знать, сколько в системе имеется ядер. Желательно, чтобы количество потоков, которые вы будете запускать, равнялось количеству ядер. Если потоков будет меньше — некоторые ядра будут простаивать зря, если потоков будет больше — некоторые потоки будут вставать в очередь и вместо ускорения получатся потери на диспетчеризацию.

Если по каким-то причинам количество ядер не удалось определить, то будет использоваться значение, указанное вручную. В данном случае это 1.

Пул


# Создать пул воркеров
my $pool = AnyEvent::Fork
  ->new
  ->require ($mod)
  ->AnyEvent::Fork::Pool::run(
      "${mod}::$sub",         # Модуль::Функция - рабочая функция воркера
      init => "${mod}::init", # Модуль::init - функция инициализации воркера
      max  => $cpus,          # Количество воркеров в пуле
      idle => 0,              # Количество воркеров при простое
      load => 1,              # Размер очереди воркера
  );

Пояснения к параметрам:

  • Рабочая функция воркера всегда должна указываться первым параметром. Это та самая функция того самого модуля, которую мы указали в двух первых «настроечных» переменных $mod и $sub. Это единственный обязательный параметр.
  • init — Если в вашем воркере есть необходимость инициализации, то в этом параметре можно указать название инициализирующей функции. В даном случае название функции указано как «init», поскольку это обычное название для такой функции, но, в принципе, можно указать любое другое название.
  • max — Этот параметр задает количество потоков, которые будет запускать пул. Именно тут следует указать ранее определенное количество ядер в системе (но если хотите — можете указать любое число, если знаете, что делаете).
  • idle — тут указывается количество воркеров, которые будут ждать «на низком старте». Чем больше это число (но не больше параметра max) — тем быстрее пул среагирует на новую поступившую задачу, но тем больше будет бесполезно ждущих (и жрущих ресурсы) процессов.
  • load — Сколько задач будет отдано каждому воркеру не дожидаясь исполнения предыдущих. Значение сильно зависит от ситуации — в каких-то случаях лучше меньше, в каких-то лучше больше. При прочих равных большее значение должно повышать эффективность работы пула (оптом — дешевле).

Также имеются и другие параметры, которые я здесь не рассматриваю. Они сильно специфичны и требуются редко. С полным списком параметров можно ознакомиться в документации модуля.

Постановка задач пулу


# Поставить пулу задачи
for my $str (qw{q2 rtr4 ui3 asdg5}) {
  $pool->($str, sub {
      print "result: @_\n";
  });
};

Пулу можно передать произвольное количество параметров, но последним параметром должен быть коллбэк. Коллбэк — это анонимная функция, которая будет вызвана после того, как воркер выполнит задачу. В эту функцию будут переданы результаты работы воркера.

Другими словами — эта функция является получателем результатов работы функции $sub. Все, что выдаст функция $sub, будет передано в качестве аргументов в функцию-коллбэк. Условно эту связь можно записать примерно так — «callback($sub)».

В нашем случае функция-коллбэк просто печатает все, что она получает.

Переменная же $str — это, собственно, и есть та самая задача, которую должен выполнить воркер. В нашем случае это просто одна строка (точнее — 4 строки, запускаемых в цикле). Строки тут не имеют никакого глубокого смысла, я просто позвал кота походить по клавиатуре.

В зависимости от ситуации вместо строки может быть всё, что угодно — имя файла, идентификатор записи в базе, математическое выражение, ссылка на сложную структуру с данными… короче говоря — всё, что угодно. Пулу без разницы, что это будет, он не обрабатывает это значение. Пул просто передает это значение воркеру, а вот тот уже должен знать, что с этим делать.

Запуск движка


AnyEvent->condvar->recv;

Эта строка говорит модулю AnyEvent, что нужно запустить в работу событийный движок и далее работать бесконечно.

В этом месте скрипт зациклится. Приведенный пример не имеет способа остановки и выхода из бесконечного цикла обработки задач. Вопрос условного выхода из цикла AnyEvent является более общим, а я здесь хочу рассмотреть только частный случай использования пула. Про условный выход из цикла можно почитать тут.

Сам воркер


Теперь возникает вопрос — а где же, собственно, сам воркер? Где код, исполняющий непосредственно работу?

Этот код вынесен в отдельный модуль, который мы указали в переменной $mod.

Вот код модуля Worker:

package Worker;

use strict;
use warnings;

my $file;

sub init {
  open $file, '>>', 'file.txt';

  my $q = select($file);
  $|=1;
  select($q);

  return;
}

sub work {
  my ($str) = @_;

  for (1..length($str)) {
      print $file "$$ $str\n";
      sleep 1;
  };

  return $str, length($str);
}

1;

Как видите, в модуле две функции — init и work.

Функция init инициализирует воркер. В нашем случае функция открывает лог-файл, в который далее будут выводиться результаты работы рабочей функции work. Как уже говорилось выше — функция init является необязательной, в нашем случае я сделал ее просто для наглядности.

Функция work — это главная функция. Это та самая рабочая функция, которая была задана в переменной $sub. Именно в этой функции выполняется вся работа, связанная с выполнением конкретной задачи.

В нашем случае функция выполняет простейшую работу — вычисляет длину строки. Для более наглядной демонстрации работы воркера я добавил в функцию цикл с секундной задержкой, который выводит строку в лог столько раз, сколько в строке букв.

Обратите внимание — функция возвращает два значения — саму строку и ее длину. Именно эти два значения будут переданы в коллбэк, заданный на этапе постановки задач пулу (а в коллбэке, как говорилось выше, эти значения будут просто напечатаны).

Вот, собственно, и весь код.

Запускаем пул


Теперь запустим наш пул и посмотрим, что получится:

image

Тут мы видим результаты работы пула. Можно заметить, что порядок вывода результатов отличается от порядка строк, заданного в цикле в скрипте. Причина понятна — у строк разная длина, поэтому воркеры обрабатывают строки с разной скоростью. Чем проще задача — тем она быстрее выполняется.

Теперь посмотрим не просто на результаты, но и на процесс работы воркеров. Для этого во втором окне запустим tail для лог-файла:

image

Обратите внимание — результаты работы выводятся вперемешку, так-как задачи выполняются одновременно. Слева видны идентификаторы процессов — видим, что задействованы 4 процесса. У меня в системе 4 ядра, поэтому одновременно выполняются все 4 задачи.

И, наконец, посмотрим на таблицу процессов:

image

Так выглядит дерево процессов нашего пула.

Первым в списке идет скрипт, далее менеджер пулов (да-да, пулов может быть несколько штук), потом менеджер пула, и, наконец, воркеры.

Если не полениться и сравнить идентификаторы процессов, то можно увидеть, что идентификаторы воркеров совпадают с идентификаторами в лог-файле.

Литература


  • +17
  • 11,2k
  • 9
Поделиться публикацией

Комментарии 9

    –3
    р
      –2
      Почему нет тега «ненормальное программирование»? (light irony)
        –2
        Ну как же? В заголовке же русским по белому написано — Perl :)
        +2
        Очень кстати пришлась статья. Спасибо автору.
          +1
          Отлично:)
          0
          Важно! Модуль воркера не нужно подключать в скрипте через «use Worker»


          Но в скрипте гордо красуется use Worker;

          Забыли убрать?
            +1
            Да, забыл убрать. Исправил, спасибо:)
            0
            Побольше бы написали почему именно AnyEvent и сравнение с аналогами.
            Все остальное из документации и так понятно.
              0
              AnyEvent — потому, что он используется у меня в текущем проекте. Возник вопрос, как сделать многопоточный воркер, я разобрался с этим и написал статью. Т.е. статья, в общем, не про AnyEvent, просто так уж совпало.

            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

            Самое читаемое