Краткая аннотация
В прошлый раз я описал, как можно поставить задачу (Job) в очередь Laravel из хранимой процедуры или триггера PostgreSQL.
В этой статье я расскажу, как можно преобразовать события, возникающие в PostgreSQL, в события Laravel.
Рабочий пример выложен на GitHub.
Содержание
Вместо введения
«Щупаем» технологию
Создаём приложение Laravel
Создаём слушателя событий Laravel
Создаём событие Laravel
Пишем логику команды listen:notify
Добавляем обработку сигналов
Настраиваем Supervisor для наблюдения за скриптом
Сериализуем полезную нагрузку
Сводим воедино
Что можно улучшить?
Заключение
Вместо введения
Зачастую требуется знать, что происходит с данными в базе и оперативно реагировать на это.
Самый простой способ получить информацию об изменениях — регулярный опрос базы данных, с целью проверить, существуют ли изменения, на которые следует реагировать. И зачастую разработчики именно так и поступают. Это не самый эффективный метод, поскольку он может привести к излишней нагрузке на сервер и требует изменения существующих или создания новых таблиц.
Но PostgreSQL может оповещать клиентов через механизм под названием «LISTEN/NOTIFY». Документация находится здесь и здесь.
У этого механизма есть недостатки:
Требует долгосрочного открытого соединения с базой данных, что может быть непрактично.
Система уведомлений не гарантирует доставку или соблюдения порядка сообщений, не следует использовать эту технологию в качестве полнофункциональной очереди сообщений. «LISTEN/NOTIFY» следует использовать только для лёгкого взаимодействия между процессами.
Размер сообщения ограничен размером строки (8192 байта в PostgreSQL 13).
Для более глубокого понимания этого механизма можно обратиться к этому материалу.
Я же просто покажу, как создать команду Artisan в чем-то похожую на команду queue:work, которая будет делать своё дело и лучше в связке с Supervisor.
«Щупаем» технологию
Для быстрого ознакомления я создам docker-контейнер:
docker run -d -e POSTGRES_USER=test -e POSTGRES_PASSWORD=test \
-p 5433:5432 --name pgsql postgres
Обратите внимание, что я использую порт 5433, потому, что у меня на родном порту крутится «стационарная» СУБД.
Теперь запускаем три терминала, подав во всех команду:
docker exec -it pgsql psql -U test
В первом и втором терминале подаём команду:
LISTEN my_event;
Эта команда «подписывает» клиента на уведомления от сервера PostgreSQL через канал с именем my_event. Если какой-либо другой клиент выполняет команду NOTIFY my_event, то все клиенты, которые выполнили команду LISTEN my_event, получат уведомление.
В третьем терминале подаём команду:
NOTIFY my_event, 'Hello, PostgreSQL!';
Затем в первом и втором терминале ещё раз подаём команду:
LISTEN my_event;
Здесь у меня есть некоторое недопонимание. Я считал, что первый и второй терминалы должны получать оповещение автоматически, но они почему-то требуют повторного вызова «LISTEN my_event».
Можете поиграться, и убедиться, что без подписки получить оповещение невозможно, также, как и прочитать его два раза.
Два первых терминала можно закрыть. Третий можно оставить для опытов.
Создаём приложение Laravel
composer create-project laravel/laravel listen_notify
cd listen_notify
sudo chown -R $USER:www-data storage
sudo chown -R $USER:www-data bootstrap/cache
chmod -R 775 storage
chmod -R 775 bootstrap/cache
Настройки соединения с базой данных в .env
DB_CONNECTION=pgsql
DB_HOST=127.0.0.1
DB_PORT=5433
DB_DATABASE=test
DB_USERNAME=test
DB_PASSWORD=test
Удалите все существующие миграции. Они нам не понадобятся.
Теперь создадим заготовки для команды artisan, event и listener, слушающий этот event
php atrisan make:command ListenNotifyCommand
php artisan make:event PostgresNotificationReceived
php artisan make:listener LogPostgresNotification --event=PostgresNotificationReceived
Настроим EventServiceProvider, добавим в массив $listen следующее значение:
protected $listen = [
Registered::class => [
SendEmailVerificationNotification::class,
],
// Добавленное значение
PostgresNotificationReceived::class => [
LogPostgresNotification::class,
]
];
Каркас приложения готов
Создаём слушателя событий Laravel
Не буду придумывать сложной логики. Просто запишу полученные данные в лог-файл
Содержимое файла app/Listeners/LogPostgresNotification.php
<?php
namespace App\Listeners;
use App\Events\PostgresNotificationReceived;
use Illuminate\Support\Facades\Log;
class LogPostgresNotification
{
public function handle(PostgresNotificationReceived $event): void
{
Log::info('Received Postgres notification: ', $event->notification);
}
}
Создаём событие Laravel
Тоже не буду мудрствовать, просто передам payload через конструктор в event
Содержимое файла app/Events/PostgresNotificationReceived.php
<?php
namespace App\Events;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;
class PostgresNotificationReceived
{
use Dispatchable, InteractsWithSockets, SerializesModels;
public function __construct(public array $notification)
{
}
}
Пишем логику команды listen:notify
Пойдём от простого к сложному. Для начала просто получим сообщение от PostgreSQL.
Для этого дорабатываем метод handle, не забывая доработать поля $signature и $description
Содержимое файла app/Console/Commands/ListenNotifyCommand.php
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PDO;
class ListenNotifyCommand extends Command
{
protected $signature = 'listen:notify';
protected $description = 'Listen to PostgreSQL notify events';
public function handle(): void
{
$pdo = DB::connection()->getPdo();
// Listen to the 'my_channel' notifications
$pdo->exec("LISTEN my_event");
$this->info('Starting');
// Forever loop
while (true) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
}
}
}
}
Проверяем, работает ли наша команда. Запускаем в терминале команду: php artisan listen:notify и в третьем терминале, (мы его не закрывали) вновь подаём NOTIFY my_event, 'Hello, PostgreSQL!';
Начало положено. Приложение на Laravel получило событие из PostgreSQL.
При изменении кода команды, не забывайте перезапускать процесс.
Добавляем обработку сигналов
Немного о сигналах, гуру могут пропустить
Сигналы являются частью стандартов POSIX и служат для асинхронного уведомления процесса о каком-либо событии в операционных системах Unix и похожих на них, например, Linux. Приложения в этих системах могут обрабатывать поступающие сигналы, например, на остановку процесса (SIGTERM, SIGINT), перезагрузку (SIGHUP) и т.д.
Windows не поддерживает POSIX сигналы. Он использует собственные механизмы для управления процессами и потоками, включая функции Windows API для отправки и обработки сигналов управления, таких как Ctrl+C.
Однако в Windows есть некоторые среды, такие как Windows Subsystem for Linux (WSL), которые предоставляют совместимость со стандартами POSIX и поддерживают POSIX-сигналы.
В контексте PHP и командной строки нам интересны следующие сигналы:
SIGINT (сигнал прерывания). Этот сигнал обычно посылается при нажатии Ctrl+C в терминале. Он сообщает процессу о необходимости остановиться.
SIGTERM (закончить выполнение). Это стандартный сигнал для остановки процесса в Unix. Программы могут перехватить этот сигнал и выполнить любую необходимую работу перед завершением. Если программа не перехватывает SIGTERM, она завершится немедленно.
SIGKILL (убить процесс немедленно). Этот сигнал нельзя перехватить или игнорировать. Когда процесс получает SIGKILL, он немедленно останавливается.
UPD: как верно отметили в комментариях, Laravel, начиная с версии 8 ввёл собственные методы для обработки сигналов. В версию 8 был добавлен прототип, который по своей парадигме больше похож на классическую обработку сигналов в PHP, но с версии 9 разработчики фреймворка пошли дальше, и добавили собственную удобную функциональность. В результате я переписал обработку сигналов «Ларавейно», но решил оставить и текущий код. В результате появилось два подраздела: обработка сигналов для версии Laravel ниже 9 и выше 8. Первый подход сохранён в ветке laravel-less-9, второй в ветке master.
Обработка сигналов для Laravel ниже 9-й версии
Выполните git checkout laravel-less-9, если используете пример с GitHub.
Добавляем два поля ($hasPcntl, $running) типа bool в класс ListenNotifyCommand и инициализируем их. Пишем метод — обработчик сигналов
protected bool $hasPcntl = false;
protected bool $running = true;
private function handleSignal(int $signal): void
{
switch ($signal) {
case SIGINT:
case SIGTERM:
$this->info( PHP_EOL . 'Received stop signal, shutting down...');
$this->running = false;
break;
default:
}
}
Для обработки сигналов потребуется расширение pcntl. Данное расширение недоступно для Windows, тем не менее, вполне возможно написать кроссплатформенное решение.
Дорабатываем метод handle
public function handle(): int
{
// Проверка, что модуль pcntl подключён
$this->hasPcntl = extension_loaded('pcntl');
if ($this->hasPcntl) {
// Если модуль pcntl подключён, назначаем обработчики сигналов
pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
}
$pdo = DB::connection()->getPdo();
$pdo->exec("LISTEN my_event");
$this->info('Start listening');
while ($this->running) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
$this->info('iter');
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
}
if ($this->hasPcntl) {
// Если модуль pcntl подключён, вызываем обработчики сигналов
pcntl_signal_dispatch();
}
}
// Возвращаем 0, как код завершения
return 0;
}
Файл app/Console/Commands/ListenNotifyCommand.php
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use PDO;
class ListenNotifyCommand extends Command
{
protected $signature = 'listen:notify';
protected $description = 'Listen to PostgreSQL notify events';
protected bool $hasPcntl = false;
protected bool $running = true;
public function handle(): int
{
$this->hasPcntl = extension_loaded('pcntl');
if ($this->hasPcntl) {
pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
}
$pdo = DB::connection()->getPdo();
$pdo->exec("LISTEN my_event");
$this->info('Start listening');
while ($this->running) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
$this->info('iter');
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
}
if ($this->hasPcntl) {
pcntl_signal_dispatch();
}
}
return 0;
}
private function handleSignal(int $signal): void
{
switch ($signal) {
case SIGINT:
case SIGTERM:
$this->info( PHP_EOL . 'Received stop signal, shutting down...');
$this->running = false;
break;
default:
}
}
}
Обработка сигналов для Laravel выше 8-й версии
В этом случае не нужен обработчик сигналов, удаляем приватную функцию handleSignal. Также удаляем вот этот блок:
if ($this->hasPcntl) {
pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
}
А блок, где вызывали pcntl_signal_dispatch() переписываем:
if ($this->hasPcntl) {
$this->trap([SIGINT, SIGTERM], function () {
$this->info( PHP_EOL . 'Received stop signal, shutting down...');
$this->running = false;
});
}
Результирующий файл находится в ветке master (git checkout master)
Файл app/Console/Commands/ListenNotifyCommand.php
<?php
namespace App\Console\Commands;
use App\Events\PostgresNotificationReceived;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Event;
use PDO;
class ListenNotifyCommand extends Command
{
protected $signature = 'listen:notify';
protected $description = 'Listen to PostgreSQL notify events';
protected bool $hasPcntl = false;
protected bool $running = true;
public function handle(): int
{
$this->hasPcntl = extension_loaded('pcntl');
$pdo = DB::connection()->getPdo();
$pdo->exec("LISTEN my_event");
$this->info('Start listening');
while ($this->running) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
$payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
$this->info('Decoded payload: ' . print_r($payload, true));
Event::dispatch(new PostgresNotificationReceived($payload));
}
if ($this->hasPcntl) {
$this->trap([SIGINT, SIGTERM], function () {
$this->info( PHP_EOL . 'Received stop signal, shutting down...');
$this->running = false;
});
}
}
return 0;
}
}
Код стал значительно лаконичнее, но не думаю, что быстрее. Скорее всего фреймворк под капотом слушает все сигналы (надо это или не надо), и если нужные нам сигналы переданы в trap() вызывает нашу функцию.
А вот определение наличия модуля pcntl следует оставить. В противном случае в Windows код будет завершаться с ошибкой, что константы SIGINT и SIGTERM не найдены.
Можете запустить команду, а также вызвать в соседней консоли NOTIFY, всё должно работать. Если команда запущена в Linux и модуль pcntl подключён, то при нажатии Ctrl+C будет выведено сообщение: Received stop signal, shutting down... Это значит, что скрипт корректно обрабатывает сигналы и останавливается, вместо того, чтобы быть принудительно остановленным.
Настраиваем Supervisor для наблюдения за скриптом
Supervisor — это удобный инструмент для управления фоновыми процессами в операционных системах, похожих на Unix. Он отслеживает работу скрипта, автоматически перезапускает его при сбое и даёт возможность управлять его состоянием, такими как запуск, остановка и перезагрузка.
Supervisor также совместим с сигналами Unix, что дает возможность настраивать поведение процесса в зависимости от разных сигналов. Mы настроили скрипт на обработку сигналов SIGINT и SIGTERM, чтобы правильно завершить его, что хорошо согласуется с Supervisor.
Когда Supervisor отправляет сигнал SIGTERM процессу, он ожидает, что процесс завершит свою работу и передаст управление обратно системе.
Если процесс успешно обработал SIGTERM и корректно завершил свою работу, обычно возвращается код завершения 0.
Если процесс не вернул управление за разумное время (по умолчанию 10 секунд), Supervisor посылает SIGKILL. Это время можно изменить в настройках, опция stopwaitsecs.
Примерный файл конфигурации Supervisor
[program:postrgres_laravel]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/your/laravel/artisan listen:notify
autostart=true
autorestart=true
user=www-data
numprocs=1
redirect_stderr=true
stdout_logfile=/var/log/postrgres_laravel.log
Подробнее про Supervisor в документации Laravel.
Сериализуем полезную нагрузку
Аргумент для NOTIFY — всегда строка. Т.е. если мы хотим передать что-либо сложное, нам нужно это сложное — сериализовать. PostgreSQL умеет работать с JSON, давайте используем это умение.
Создадим хранимую функцию, которая принимает на вход json, и отправляет его в канал my_event.
Файл миграции 2024_03_05_125805_create_send_notify_function.php
<?php
use Illuminate\Database\Migrations\Migration;
return new class extends Migration {
public function up(): void
{
DB::unprepared('
CREATE OR REPLACE FUNCTION send_notify(data json) RETURNS VOID AS $$
BEGIN
PERFORM pg_notify(\'my_event\', data::text);
END;
$$ LANGUAGE plpgsql;
');
}
public function down(): void
{
DB::unprepared('DROP FUNCTION IF EXISTS send_notify(json);');
}
};
Выполняем миграцию
php artisan migrate
У меня всё прошло успешно. Теперь нужно чуть-чуть доработать handle
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
$payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
$this->info('Decoded payload: ' . print_r($payload, true));
}
Здесь я выделяю $payload и вывожу её в терминал.
Давайте проверим, всё ли у нас работает. Запустите команду php artisan listen:notify. На этот раз в терминале psql подадим следующую конструкцию:
select send_notify(json_build_object('key1', 'Hello, PostgreSQL!'));
Смотрим в терминал, работает. Давайте передадим что-либо менее тривиальное:
select send_notify(json_build_object('key1', 'Hello, PostgreSQL!', 'key2', json_build_object('key2_inner', 2, 'key3_inner', 3)))
Главное, не увлекаться, и помнить об ограничении на 8192 байта.
Сводим воедино
Теперь осталось совсем немного. В нашей команде отправить событие, чтобы его могли слушать все слушатели, которые на него подписаны. Для этого добавляем всего одну строчку:
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
$payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
$this->info('Decoded payload: ' . print_r($payload, true));
// Новая строка
Event::dispatch(new PostgresNotificationReceived($payload));
}
Полный листинг файла app/Console/Commands/ListenNotifyCommand.php
<?php
namespace App\Console\Commands;
use App\Events\PostgresNotificationReceived;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Event;
use PDO;
class ListenNotifyCommand extends Command
{
protected $signature = 'listen:notify';
protected $description = 'Listen to PostgreSQL notify events';
protected bool $hasPcntl = false;
protected bool $running = true;
public function handle(): int
{
$this->hasPcntl = extension_loaded('pcntl');
if ($this->hasPcntl) {
pcntl_signal(SIGINT, [$this, 'handleSignal']);
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
}
$pdo = DB::connection()->getPdo();
$pdo->exec("LISTEN my_event");
$this->info('Start listening');
while ($this->running) {
$notification = $pdo->pgsqlGetNotify(PDO::FETCH_ASSOC, 10000);
if ($notification) {
$this->info('Received notification: ' . json_encode($notification, JSON_THROW_ON_ERROR));
$payload = json_decode($notification['payload'], true, 512, JSON_THROW_ON_ERROR);
$this->info('Decoded payload: ' . print_r($payload, true));
Event::dispatch(new PostgresNotificationReceived($payload));
}
if ($this->hasPcntl) {
pcntl_signal_dispatch();
}
}
return 0;
}
private function handleSignal(int $signal): void
{
switch ($signal) {
case SIGINT:
case SIGTERM:
$this->info( PHP_EOL . 'Received stop signal, shutting down...');
$this->running = false;
break;
default:
}
}
}
Запускаем команду: php artisan listen:notify и подаём в соседнем терминале команду
select send_notify(json_build_object('key1', 'Hello, PostgreSQL!', 'key2', json_build_object('key2_inner', 2, 'key3_inner', 3)));
По нашей задумке, слушатель события пишет полезную нагрузку в лог. Смотрим в лог.
Проверяю, как скрипт отрабатывает сигналы в Linux
Видно, что в ответ на Ctrl+C скрипт выводит сообщение: Received stop signal, shutting down...
Что можно улучшить?
Скорее всего, обработку полученного сообщения, следует тотчас же закинуть, как задачу, в асинхронную очередь Laravel, чтобы обработка сообщения не тормозила «бесконечный» цикл, что может привести к пропуску сообщений или аварийному завершению скрипта процессом Supervisor.
Дальнейшее принятие решений о диспетчеризации событий выполнять в этой задаче.
Заключение
Вы можете использовать функцию send_notify в других хранимых функциях или триггерах PostgreSQL, передавая из триггеров значения переменных TG_TABLE_NAME и TG_OP для принятия решения, что и как обработать. В статье показана всего лишь основа для получения событий из PostgreSQL. Как применять их на практике, зависит только от воображения разработчика.
Рабочее приложение можно найти на GitHub