Pull to refresh

Coro и ещё одна реализация rouse-callback

Reading time 8 min
Views 4.1K
Есть в CPAN такое замечательное семейство модулей — Coro. Эти модули позволяют программировать на перле с использованием корутин.

Небольшое введение

Представьте себе, что в любой момент в любом месте программы (например, внутри тела функции или на очередной итерации цикла) вы можете полностью сохранить текущее состояние и временно «переключиться» в другую точку программы. Выполнив какую-то полезную работу в этой «другой» точке, вы возвращаетесь назад, восстанавливаете сохранённое состояние, и дальше вся работа происходит так, будто этого «переключения» не было вовсе. Ну, конечно же, не считая тех изменений общих данных, которые произошли в новой точке. Имея несколько «тяжёлых» функций, каждая из которых не зависит от результатов работы остальных, подобными «переключениями» можно имитировать их параллельное выполнение. То есть со стороны это будет выглядеть так, как будто функции выполняются параллельно, но, на самом деле, в каждый момент времени выполняется «кусочек» только одной из них, и размер этого «кусочка» определяете вы. Иными словами, получается, что каждая функция выполняется в своём потоке, все потоки используют только одно процессорное ядро (не зависимо от их количества в системе), а для того, чтобы каждый поток получал своё процессорное время, все они должны делиться между собой этим временем. Ввиду отсутствия настоящей параллельности, все изменения общих данных, произошедшие в каком либо потоке, становятся сразу же доступными во всех остальных потоках, а так как моменты переключения между потоками задаёте вы, необходимость синхронизации резко снижается (в основном, она нужна для работы с внешними ресурсами).
Всё это и многое другое можно реализовать в перле при помощи семейства модулей под названием Coro. Главный модуль этого семейства позволяет выполнять функции, либо блоки кода в отдельных потоках (ниже я буду называть эти потоки coro-потоками), а вспомогательные модули добавляют инструменты синхронизации, очереди сообщений, интеграцию с событийными циклами и т.п.

Создание coro-потоков

Для того, чтобы создать coro-поток, необходимо воспользоваться одной из конструкций:

Coro::async { ... } @args;

обратите внимание на отсутствие запятой между блоком и его аргументами, либо

my $coro = new Coro(\&code_ref, @args);
$coro->ready();

В первом случае потоком становится переданный функции Coro::async блок, при этом аргументы, указанные сразу же после блока, становятся доступными внутри блока как аргументы функции (через @_). Во втором случае вы создаёте поток используя ссылку на функцию и аргументы для этой функции. Вторая конструкция возвращает ссылку на созданный поток, для которого затем вызывается метод ready(). Именно в этом и заключается основное отличие второй конструкции от первой — созданный поток будет неактивным до тех пор, пока он не будет помещён в ready-очередь (об этом подробнее ниже).
В обоих случаях поток «живёт» до тех пор, пока выполняется соответствующая функция или блок кода. Кстати, сама программа тоже выполняется в отдельном coro-потоке — главном.

Переключение между coro-потоками

В отличие от системных потоков, переключение между которыми осуществляется где-то в недрах операционной системы, между coro-потоками необходимо переключаться в ручную. Наиболее очевидные моменты переключения (вы можете придумать ещё более или менее очевидные):
  • Каждая n-ная итерация «долгоиграющего» цикла
  • Каждая блокирующая операция (работа с сетью, диском и т.п.)

Во втором случае процессор всё равно не используется до тех пор, пока данные не придут по сети, или не будут считаны с диска (равно как и переданы по сети, либо записаны на диск).
Как же осуществлять передачу управления с помощью Coro? Для того, чтобы сохранить текущее состояние и прервать выполнение текущего coro-потока, необходимо использовать статический метод schedule(), дополнительно этот метод извлекает следующий coro-поток из ready-очереди и запускает его на выполнение. Соответственно, для того, чтобы coro-поток, вызывающий schedule(), смог вновь получить процессорное время в будущем, он сам должен предварительно поместить себя в конец ready-очереди при помощи метода ready() (либо это должен сделать за него любой другой поток). Прерванный поток остаётся заблокированным (не получает процессорное время) до тех пор, пока он не будет помещён в конец ready-очереди; если это не произойдёт к тому моменту, когда другие активные потоки завершат свою работу, Coro обнаружит это и аварийно завершит программу. Так как вызовы ready() и schedule() используются совместно довольно часто, модуль Coro предоставляет для удобства вызов cede(), являющийся аналогом следующей пары строк:

$Coro::current->ready();
Coro::schedule;

Рассмотрим пример
#!/usr/bin/perl
$| = 1;

use strict;
use warnings;

use Coro;

# Создание coro-потока с помощью Coro::async
Coro::async {
    my $thread_id = shift;

    # Устанавливаем описание для coro-потока
    $Coro::current->desc("Thread #$thread_id");
    for (my $i = 0; $i < 1_000_000; $i++) {
        if ($i % 1000 == 0) {
            print "$Coro::current->{desc} - Processed: $i items\n";
            # Помещаем текущий coro-поток в конец ready-очереди
            $Coro::current->ready();
            # Передаём управление следующему потоку из ready-очереди
            Coro::schedule();
        }
    }
} 0;

# Эта функция будет выполняться в отдельном coro-потоке
sub my_thread {
    my $thread_id = shift;

    $Coro::current->desc("Thread #$thread_id");
    for (my $i = 0; $i < 1_000_000; $i++) {
        if ($i % 1000 == 0) {
            print "$Coro::current->{desc} - Processed: $i items\n";
            # Временно передаём управление следующему coro-потоку
            Coro::cede();
        }
    }
}

my @threads = ();
for (my $thread_id = 1; $thread_id < 5; $thread_id++) {
    # Создаём неактивный coro-поток с помощью Coro::new()
    my $thread = new Coro(\&my_thread, $thread_id);
    # Помещаем созданный coro-поток в конец ready-очереди
    $thread->ready();
    push @threads, $thread;
}

while (my $thread = shift @threads) {
    # Блокируем главный coro-поток до тех пор, пока очередной coro-поток не завершится
    $thread->join();
}

Результат:
Thread #0 - Processed: 0 items
Thread #1 - Processed: 0 items
Thread #2 - Processed: 0 items
Thread #3 - Processed: 0 items
Thread #4 - Processed: 0 items
Thread #0 - Processed: 1000 items
Thread #1 - Processed: 1000 items
Thread #2 - Processed: 1000 items
Thread #3 - Processed: 1000 items
Thread #4 - Processed: 1000 items
...
Thread #0 - Processed: 999000 items
Thread #1 - Processed: 999000 items
Thread #2 - Processed: 999000 items
Thread #3 - Processed: 999000 items
Thread #4 - Processed: 999000 items


В примере coro-потоки создаются разными способами и по-разному передают друг другу процессорное время. Все coro-потоки выполняют одинаковую работу — через каждые 1000 итераций, отчитываются о ходе работы и прерывают своё выполнение, давая возможность поработать остальным coro-потокам, предварительно поместив себя в конец ready-очереди (явно, либо с помощью cede()). Программа продолжает работать до тех пор, пока не завершится главный coro-поток, а главный coro-поток занят ожиданием окончания 4 из 5 созданных coro-потоков (вызов метода join() блокирует тот coro-поток, из которого осуществляется вызов до тех пор, пока coro-поток, для которого вызвали этот метод не завершится).

Интеграция с событийными циклами

Приведённый пример демонстрирует то, как coro-потоки делятся процессорным временем, делая перерыв в продолжительной по времени работе. Как было отмечено выше, неплохой причиной поделиться процессорным временем является также выполнение блокирующих операций (как правило, операций ввода/вывода).
Когда перед нами встаёт проблема эффективной работы программы со множеством блокирующих операций, мы обычно решаем эту проблему с помощью событийных циклов (event loop). Например, переводим сокеты в неблокирующий режим и «навешиваем» на них «вотчеры», следящие за готовностью сокета к записи или чтению и создаём таймер для прерывания операций по таймауту. По мере наступления интересующих нас событий, из недр событийного цикла вызываются коллбэки, связанные с соответствующими «вотчерами». По мере усложнения проекта разобраться в том какой коллбэк, когда и почему вызывается становится всё сложнее и сложнее. С использованием Coro ситуация заметно улучшается и код программы становится более линейным и понятным (чисто моё мнение).
Прежде всего, необходимо отметить, что в семействе модулей Coro существуют три модуля для интеграции coro-потоков в событийные циклы — это Coro::AnyEvent, Coro::Event и Coro::EV (куски кода ниже будут для Coro::EV). Для того, чтобы интегрировать событийный цикл в вашу программу, вам достаточно в любом coro-потоке (например, в главном) запустить сам цикл:

Coro::async { EV::run() };

Для удобства обработки событий модуль Coro предоставляет две полезные функции — rouse_cb() и rouse_wait():
  • rouse_cb() генерирует и возвращает коллбэк, который при вызове сохраняет переданные ему аргументы и оповещает внутренности Coro о факте вызова
  • rouse_wait() блокирует текущий coro-поток до тех пор, пока не будет вызван последний, созданный функцией rouse_cb() коллбэк (можно также в качестве аргумента указать вызов какого именно коллбэка необходимо ждать); функция возвращает то, что было передано коллбэку в качестве аргументов

Таким образом, приведённые ниже куски кода эквивалентны:

# 1. Без использования rouse_cb() и rouse_wait()
my $timer = EV::timer(5, 5, sub {
    my ($watcher, $revents) = @_;
    print "Timer $wathcer: timeout\n";
});

#2. С использованием rouse_cb() и rouse_wait()
my $timer = EV::timer(5, 5, rouse_cb());
my ($watcher, $revents) = rouse_wait();
print "Timer $wathcer: timeout\n";


Ещё одна реализация rouse-коллбэков

Приведённый выше кусочек кода не передаёт всей мощи rouse_cb() и rouse_wait(), однако её понимание приходит по мере работы над реальными проектами. Тем не менее, для себя я обнаружил главный минус встроенных rouse-коллбэков — если вы сохраните коллбэк, возвращённый функцией rouse_cb() и попытаетесь использовать его повторно (что логично для циклических операций, ведь зачем на каждой итерации создавать новый объект, выполняющий каждый раз одну и ту же работу?), у вас ничего не выйдет. Будучи вызванным хоть раз, коллбэк сохраняет своё состояние и все последующие вызовы rouse_wait() для этого коллбэка сразу же возвращают сохранённые ранее аргументы.
Поэтому я решил написать свою реализацию rouse-коллбэка. В этой реализации коллбэк является объектом, а вместо функции rouse_wait() используется метод wait() коллбэка:

my $cb = new My::RouseCallback;
my $timer = EV::timer(5, 5, $cb);
my ($watcher, $revents) = $cb->wait();
print "Timer $wathcer: timeout\n";

Реализация My::RouseCallback
package My::RouseCallback;

use strict;
use warnings;

use Coro;


# "Хранилище" данных для всех созданных объектов My::RouseCallback
my %STORAGE = ();

# Создание коллбэка: my $cb = new My::RouseCallback;
sub new {
    my ($class) = @_;

    my $context = {args => [], done => 0, coro => undef};
    my $self = bless sub {
        # Сохраняем переданные коллбэку аргументы
        $context->{args} = \@_;
        # Устанавливаем признак того, что коллбэк был вызван
        $context->{done} = 1;
        if ($context->{coro}) {
            # Активизируем ожидающий coro-поток
            $context->{coro}->ready();
        }
    }, $class;
    $STORAGE{"$self"} = $context;

    return $self;
};

# Ожидаем вызов коллбэка: $cb->wait();
sub wait {
    my $self = shift;

    my $context = $STORAGE{"$self"};
    # Чтобы коллбэк знал, какой coro-поток ожидает его
    $context->{coro} = $Coro::current;
    # Блокируем текущий coro-поток до тех пор, пока не будет вызван коллбэк
    while ($context->{done} == 0) {
        Coro::schedule();
    }

    # Возвращаем переданные коллбэку аргументы и очищаем состояние
    my @args = @{ $context->{args} };
    $context->{args} = [];
    $context->{done} = 0;

    return @args;
}

sub DESTROY {
    my $self = shift;

    $self->();
    delete $STORAGE{"$self"};
};


1;

__END__



Если вы видите возможность использования Coro в своей задаче, обязательно попробуйте, возможно, вам понравится. Изучайте документацию, делитесь полученными на практике знаниями.

PS. Если вы используете модули из семейств EV и Coro вместе, будьте внимательны. Как первый, так и второй экспортируют по умолчанию функцию async(). Поэтому при создании coro-потоков всегда лучше явно указывать Coro::async.
Tags:
Hubs:
+6
Comments 4
Comments Comments 4

Articles