В моем текущем проекте много задач, которые выполняются в фоне. Из внешнего сервиса прилетают данные и проходят несколько стадий обработки. Обработка реализована через механизм очередей. Это удобно, можно варьировать количество воркеров на каждый тип процессов. Да и в случае, если что-то упадет, очередь будет копиться, и данные не потеряются — обработаются, как только проблема будет устранена.
Чтобы из одного процесса создать задачу для следующей стадии обработки, мы просто вызывали в конце обработки
dispatch()
, примерно так:class MyFirstJob extends Job
{
use DispatchesJobs;
protected $data;
public function __construct($data)
{
$this->data = $data;
}
public function handle()
{
$this->doSomething($this->data);
$this->dispatch(new MySecondJob($this->data)); // Second task
}
}
А следующая стадия обработки инициировалась совершенно аналогично:
class MySecondJob extends Job
{
use DispatchesJobs;
protected $data;
public function __construct($data)
{
$this->data = $data;
}
public function handle()
{
$this->doSomething($this->data);
if ($this->someCondition($this->data)) {
$this->dispatch(new MyThirdJob($this->data)); // Third task
}
}
}
Поначалу было хорошо, но добавлялись новые стадии обработки и цепочка росла. В очередной раз, когда надо было добавить еще одну стадию обработки (новая очередь), я поймал себя на мысли, что уже не помню точно, что и в какой последовательности обрабатывается. И по коду понять это уже не так-то и просто. Там появились элементы бизнес логики: в таком-то случае запускается такая-то обработка, в другом случае создается сразу набор задач. В общем, все то, что мы так “любим” видеть в больших системах.
Ох-ох, подумал я, пора что-то предпринять. И решил, что будет очень удобно вынести управление порядком обработки (порядок вызовов
dispatch()
) в отдельный код. Тогда все будет логично и наглядно — вот у нас бизнес процесс (управляющий код, менеджер очередей), вот у нас отдельные его кусочки (очереди).Я так и сделал и до сих пор доволен. Сейчас расскажу, что именно сделал. Буду рад, если и вам пригодится этот подход.
Управление очередями
У нас несколько независимых процессов обработки данных. Чтобы описать каждый алгоритм отдельно, делаем абстрактный класс для менеджера очередей.
<?php
namespace App\Jobs\Pipeline;
use App\Jobs\Job;
use Illuminate\Foundation\Bus\DispatchesJobs;
abstract class PipelineAbstract
{
use DispatchesJobs;
/**
* @param array $params
* @return PipelineAbstract
*/
public function start(array $params)
{
$this->next(null, $params);
return $this;
}
/**
* @param Job $currentJob
* @param array $params Set of parameters for starting new jobs
*/
abstract public function next(Job $currentJob = null, array $params);
/**
* @param Job $job
*/
protected function startJob(Job $job)
{
$this->dispatch($job);
}
}
В методе
next()
у нас как раз и будет реализован бизнес процесс. startJob()
— просто обертка над dispatch()
на всякий случай. А start()
будем использовать в том месте, где надо инициировать весь процесс обработки данных (там, где прилетают данные из внешнего сервиса).Пример реализации бизнес логики:
<?php
namespace App\Jobs\Pipeline;
use App\Jobs\Job;
use App\Jobs\MyFirstJob;
use App\Jobs\MySecondJob;
use App\Jobs\MyThirdJob;
class ProcessDataPipeline extends PipelineAbstract
{
/**
* @inheritdoc
*/
public function next(Job $currentJob = null, array $params)
{
// Start first job
if ($currentJob === null)
{
$this->startJob(new MyFirstJob($params, $this));
}
if ($currentJob instanceof MyFirstJob)
{
$this->startJob(new MySecondJob($params, $this));
}
if ($currentJob instanceof MySecondJob)
{
if ($this->someCondition($params))
{
$this->startJob(new MyThirdJob($params, $this));
}
}
}
}
Вот и все. Остается только заменить запуск
MyFirstJob
.Было
$this->dispatch(new MyFirstJob($data));
Стало
(new ProcessDataPipeline())->start($data);
А вместо добавления заданий в остальные очереди вызовем метод
next()
.Было
$this->dispatch(new MySecondJob($data));
Стало
$this->next($data);
Чуть не забыл. Нам еще придется доработать для этого базовый класс очереди. В коде выше видно, что мы при инстанцировании объекта очереди теперь еще передаем туда помимо данных объект пайплайна.
<?php
namespace App\Jobs;
use App\Jobs\Pipeline\PipelineAbstract;
abstract class Job
{
/**
* @param array $params
*/
public function next(array $params)
{
if ($this->pipeline)
{
$this->pipeline->next($this, $params);
}
}
}
И в конструкторах конкретных джобов принимаем экземпляр пайплайна, чтобы шаги бизнес логики (вызов метода
next()
) обрабатывались нужной реализацией пайплайна.class MyFirstJob extends Job
{
/**
* @param mixed data
* @param PipelineAbstract|null $pipeline
*/
public function __construct($data, PipelineAbstract $pipeline = null)
{
$this->data = $data;
$this->pipeline = $pipeline;
}
}
Вот теперь всё. Получилось похоже на цепочку ответственности. Я постарался объяснить идею простым языком. Если вам вдруг тоже захотелось так сделать, то тут я опубликовал рабочий пример реализации, возможно так кому то будет удобнее, чем на словах:
Что хорошего
- Описание процесса обработки данных теперь не размазано по коду, а сосредоточено в одном методе.
- Появилась возможность аккуратно добавить новое поведение в механизм управления очередью. Например, логирование, хранение в базе состояний обработки каждого шага.
- Стало легче добавлять новые стадии обработки и менять порядок выполнения задач.
Кстати, в свежей версии Laravel появился похожий инструмент
withChain()
, он гарантирует выполнение задач в строгой последовательности. В простых случаях этого будет достаточно. Но в случаях, когда есть условия запуска того или иного процесса, когда данные для следующего процесса рождаются в предыдущем, все же нужен более универсальный механизм. Например, такой, о котором я и рассказал в этой статье.