
В моем текущем проекте много задач, которые выполняются в фоне. Из внешнего сервиса прилетают данные и проходят несколько стадий обработки. Обработка реализована через механизм очередей. Это удобно, можно варьировать количество воркеров на каждый тип процессов. Да и в случае, если что-то упадет, очередь будет копиться, и данные не потеряются — обработаются, как только проблема будет устранена.
Чтобы из одного процесса создать задачу для следующей стадии обработки, мы просто вызывали в конце обработки
dispatch(), примерно так:class MyFirstJob extends Job { use DispatchesJobs; protected $data; public function __construct($data) { $this->data = $data; } public function handle() { $this->doSomething($this->data); $this->dispatch(new MySecondJob($this->data)); // Second task } }
А следующая стадия обработки инициировалась совершенно аналогично:
class MySecondJob extends Job { use DispatchesJobs; protected $data; public function __construct($data) { $this->data = $data; } public function handle() { $this->doSomething($this->data); if ($this->someCondition($this->data)) { $this->dispatch(new MyThirdJob($this->data)); // Third task } } }
Поначалу было хорошо, но добавлялись новые стадии обработки и цепочка росла. В очередной раз, когда надо было добавить еще одну стадию обработки (новая очередь), я поймал себя на мысли, что уже не помню точно, что и в какой последовательности обрабатывается. И по коду понять это уже не так-то и просто. Там появились элементы бизнес логики: в таком-то случае запускается такая-то обработка, в другом случае создается сразу набор задач. В общем, все то, что мы так “любим” видеть в больших системах.
Ох-ох, подумал я, пора что-то предпринять. И решил, что будет очень удобно вынести управление порядком обработки (порядок вызовов
dispatch()) в отдельный код. Тогда все будет логично и наглядно — вот у нас бизнес процесс (управляющий код, менеджер очередей), вот у нас отдельные его кусочки (очереди).Я так и сделал и до сих пор доволен. Сейчас расскажу, что именно сделал. Буду рад, если и вам пригодится этот подход.
Управление очередями
У нас несколько независимых процессов обработки данных. Чтобы описать каждый алгоритм отдельно, делаем абстрактный класс для менеджера очередей.
<?php namespace App\Jobs\Pipeline; use App\Jobs\Job; use Illuminate\Foundation\Bus\DispatchesJobs; abstract class PipelineAbstract { use DispatchesJobs; /** * @param array $params * @return PipelineAbstract */ public function start(array $params) { $this->next(null, $params); return $this; } /** * @param Job $currentJob * @param array $params Set of parameters for starting new jobs */ abstract public function next(Job $currentJob = null, array $params); /** * @param Job $job */ protected function startJob(Job $job) { $this->dispatch($job); } }
В методе
next() у нас как раз и будет реализован бизнес процесс. startJob() — просто обертка над dispatch() на всякий случай. А start() будем использовать в том месте, где надо инициировать весь процесс обработки данных (там, где прилетают данные из внешнего сервиса).Пример реализации бизнес логики:
<?php namespace App\Jobs\Pipeline; use App\Jobs\Job; use App\Jobs\MyFirstJob; use App\Jobs\MySecondJob; use App\Jobs\MyThirdJob; class ProcessDataPipeline extends PipelineAbstract { /** * @inheritdoc */ public function next(Job $currentJob = null, array $params) { // Start first job if ($currentJob === null) { $this->startJob(new MyFirstJob($params, $this)); } if ($currentJob instanceof MyFirstJob) { $this->startJob(new MySecondJob($params, $this)); } if ($currentJob instanceof MySecondJob) { if ($this->someCondition($params)) { $this->startJob(new MyThirdJob($params, $this)); } } } }
Вот и все. Остается только заменить запуск
MyFirstJob.Было
$this->dispatch(new MyFirstJob($data));
Стало
(new ProcessDataPipeline())->start($data);
А вместо добавления заданий в остальные очереди вызовем метод
next().Было
$this->dispatch(new MySecondJob($data));
Стало
$this->next($data);
Чуть не забыл. Нам еще придется доработать для этого базовый класс очереди. В коде выше видно, что мы при инстанцировании объекта очереди теперь еще передаем туда помимо данных объект пайплайна.
<?php namespace App\Jobs; use App\Jobs\Pipeline\PipelineAbstract; abstract class Job { /** * @param array $params */ public function next(array $params) { if ($this->pipeline) { $this->pipeline->next($this, $params); } } }
И в конструкторах конкретных джобов принимаем экземпляр пайплайна, чтобы шаги бизнес логики (вызов метода
next()) обрабатывались нужной реализацией пайплайна.class MyFirstJob extends Job { /** * @param mixed data * @param PipelineAbstract|null $pipeline */ public function __construct($data, PipelineAbstract $pipeline = null) { $this->data = $data; $this->pipeline = $pipeline; } }
Вот теперь всё. Получилось похоже на цепочку ответственности. Я постарался объяснить идею простым языком. Если вам вдруг тоже захотелось так сделать, то тут я опубликовал рабочий пример реализации, возможно так кому то будет удобнее, чем на словах:
Что хорошего
- Описание процесса обработки данных теперь не размазано по коду, а сосредоточено в одном методе.
- Появилась возможность аккуратно добавить новое поведение в механизм управления очередью. Например, логирование, хранение в базе состояний обработки каждого шага.
- Стало легче добавлять новые стадии обработки и менять порядок выполнения задач.
Кстати, в свежей версии Laravel появился похожий инструмент
withChain(), он гарантирует выполнение задач в строгой последовательности. В простых случаях этого будет достаточно. Но в случаях, когда есть условия запуска того или иного процесса, когда данные для следующего процесса рождаются в предыдущем, все же нужен более универсальный механизм. Например, такой, о котором я и рассказал в этой статье.