Сегодня затронем такую неожиданную тему как работу с брокером сообщений Nats и PHP. Как оказалось, есть очень мало статей на эту тему, что странно, ведь PHP - это лучший язык программирования. Не знаю, почему так вышло, но напишите в комменты :-)
Немного про Nats
Nats – это написанный на Go высокопроизводительный брокер сообщений, работающий по принципу, схожему с Kafka (если рассматривать JetStream, а не Nats Core). Он использует PUB/SUB модель и топики для разделения, кто, что и куда отправляет и кто, что и откуда получает.
Проткол Nats
Протокол Nats – это просто текстовый протокол на базе TCP/IP, сообщения разделяются новой строкой. Можно даже работать с ним с помощью telnet.
Попробуем подключиться к nats с помощью telnet и поздороваться.
Запускаем nats в Docker
docker run --rm -it -p 4222:4222 nats
и теперь в другом окне терминала можно подключиться:
telnet 127.0.0.1 4222
При подключении мы сразу же получим приветственное сообщение сервера (INFO):
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
INFO {"server_id":"ND6UPRIAIQ4K4EOMCQFI7NEGEEN7NW7CQCELNB3F2IVK3RGKHCA3UC36","server_name":"ND6UPRIAIQ4K4EOMCQFI7NEGEEN7NW7CQCELNB3F2IVK3RGKHCA3UC36","version":"2.11.1-binary","proto":1,"git_commit":"6cebef9","go":"go1.24.1","host":"0.0.0.0","port":4222,"headers":true,"max_payload":1048576,"client_id":5,"client_ip":"192.168.215.1","cluster":"my_cluster","xkey":"XCTTMXJ3T65TODSZEAHRFDMDUIMH2BP4SARGLKSFYRYOLH3MIQTUODC2"}
Нужно ему ответить сообщением:
CONNECT {}
Он ответит:
+OK
После этого можно отправлять команды. Например, PING
PING
Сервер ответит
PONG
Если у кого-нибудь возникнет желание разобраться поподробнее с тем, как это работает, есть неплохая документацию с демо протокола https://docs.nats.io/reference/reference-protocols/nats-protocol-demo
Нужно отметить, что Nats Core – это обычная почти что синхронная очередь. То, что публикуется в топик, должен забрать consumer. Если консьюмеров нет, сообщение уничтожается.
Для реализации очереди в нашем обычном понимании есть Nats JetStream, который реализует хранилище сообщений на сервере, возможность консьюмерам отключаться и подключаться, а также гарантии доставки.
Протокол JetStream уже сильно сложнее, работать с ним через telnet - больная боль, так что лучше брать библиотеки или хотя бы nats-box
Библиотеки на PHP
Начнем с того, что библиотек, реализующих работу с брокером, не так много. Я нашел вообще всего две:
и только первая поддерживается, так что в рамках данной статьи рассмотрим внедрение с помощью библиотеки nats.php
.
Работать с этой библиотекой достаточно удобно, но не хватает более подробной документации и логирования, помимо debug логов. Но ничего, прорвемся :-)
Для дальнейшей работы примем, что мы установили библиотеку
composer require basis-company/nats
А также подключили composer autoload. Например, можно в начале файла указать:
<?php
require_once 'vendor/autoload.php';
Пробуем работать с библиотекой
Для простоты рассказа будем использовать обычный function style PHP с разделением кода на функции для хорошего понимания.
Реализуем подключение и отключение
function connect(): \Basis\Nats\Client
{
$configuration = new \Basis\Nats\Configuration(
host: '127.0.0.1',
port: 4222,
);
return new \Basis\Nats\Client($configuration);
}
function disconnect(\Basis\Nats\Client $client): void
{
$client->disconnect();
}
Библиотека поддерживает много разных способов аутентификации. Но в нашем примере - никакой безопасности :)
Для создания потока и обращения к нему будем использовать следующую функцию
function init_stream(\Basis\Nats\Client $client): \Basis\Nats\Stream\Stream
{
$api = $client->getApi();
$stream = $api->getStream('php-nats-test');
// По умолчанию используется:
// хранилище - файловое,
// сабжекты, относящиеся к этому потоку - только имя потока.
$stream->getConfiguration()
->setRetentionPolicy(\Basis\Nats\Stream\RetentionPolicy::WORK_QUEUE);
$stream->create();
return $stream;
}
Функция init_stream
создает поток, который работает как классическая очередь (WORK_QUEUE). Если задача взята и ack-нута, сообщение удаляется. При этом, накладывается ограничение: на один топик - одна группа консьюмеров (консьюмеры с одним name), так реализуется гарантия доставки exactly once.
Более подробно можно посмотреть в документации Nats по JetStream Streams: https://docs.nats.io/nats-concepts/jetstream/streams
В качестве задач будем рассматривать обычные строки. В перспективе можно в эти строки записывать любые значения, например, JSON закодированные структуры данных.
Код добавления задач в поток:
function push_message(\Basis\Nats\Client $client, string $message): string
{
$id = uniqid('', true);
$client->publish(
'php-nats-test',
new \Basis\Nats\Message\Payload($message, [
// Это поле также используется для дедупликации, но также полезно и для отображение логов в системе
'Nats-Msg-ID' => $id,
// Номер попытки обработки сообщения
'x-attempt' => 1,
]),
);
return $id;
}
При отправке задачи генерируется её ID. При получении сообщения этот ID также придет в заголовках, что может быть полезно для дальнейшей обработки. Смысл заголовка x-attempt
разберем немного позже.
На текущий момент, если собрать весь код, мы сможем отправлять задачи в очередь.
Для начала запустим Nats в локальном докере с включенным jetstream:
docker run --rm -it -p 4222:4222 nats -js
Код запуска:
$client = connect();
init_stream($client);
$id = push_message($client, 'Hello World!');
echo 'pushed message id: ' . $id . PHP_EOL;
$id = push_message($client, 'Hello World, again!');
echo 'pushed message id: ' . $id . PHP_EOL;
$id = push_message($client, 'Hello World, once more!');
echo 'pushed message id: ' . $id . PHP_EOL;
disconnect($client);
Пробуем запустить и получаем логи:
% php producer.php
pushed message id: 68028fce39f239.23978889
pushed message id: 68028fce3b55a9.67959494
pushed message id: 68028fce3b56d1.07523175
Итоговый код файла producer.php
<?php
require_once 'vendor/autoload.php';
function connect(): \Basis\Nats\Client
{
$configuration = new \Basis\Nats\Configuration(
host: '127.0.0.1',
port: 4222,
);
return new \Basis\Nats\Client($configuration);
}
function disconnect(\Basis\Nats\Client $client): void
{
$client->disconnect();
}
function init_stream(\Basis\Nats\Client $client): \Basis\Nats\Stream\Stream
{
$api = $client->getApi();
$stream = $api->getStream('php-nats-test');
// По умолчанию используется:
// хранилище - файловое,
// сабжекты, относящиеся к этому потоку - только имя потока.
$stream->getConfiguration()
->setRetentionPolicy(\Basis\Nats\Stream\RetentionPolicy::WORK_QUEUE);
$stream->create();
return $stream;
}
function push_message(\Basis\Nats\Client $client, string $message): string
{
$id = uniqid('', true);
$client->publish(
'php-nats-test',
new \Basis\Nats\Message\Payload($message, [
// Это поле также используется для дедупликации, но также полезно и для отображение логов в системе
'Nats-Msg-ID' => $id,
// Номер попытки обработки сообщения
'x-attempt' => 1,
]),
);
return $id;
}
$client = connect();
init_stream($client);
$id = push_message($client, 'Hello World!');
echo 'pushed message id: ' . $id . PHP_EOL;
$id = push_message($client, 'Hello World, again!');
echo 'pushed message id: ' . $id . PHP_EOL;
$id = push_message($client, 'Hello World, once more!');
echo 'pushed message id: ' . $id . PHP_EOL;
disconnect($client);
Далее переходим к следующему этапу. Нам ведь нужно эти сообщения забрать!
Потребление сообщений из очереди
Вернемся к вопросу реализации ретраев. В целом, для её реализации есть два варианта:
Штатный ограничитель в настройках консьюмера "доставлять мне одну задачу не более X раз, если она nack-нута", при этом при исчерпании попыток, задача из очереди удалена не будет
Кастомная логика на основе заголовков и обратной отправки задачи в конец очереди (спойлер: использовать будем именно его)
Самый очевидный способ следующий:
Берем задачу
Пытаемся обработать
Получаем ошибку
Отправляем задачу в очередь с attempt = attempt + 1
Проверяем, если
attempt === MAX_ATTEMPTS
, не отправляем задачу в очередь
Код функции переотправки задачи в очередь:
function redeliver(\Basis\Nats\Client $client, \Basis\Nats\Message\Msg $msg): void
{
$id = uniqid('', true);
$client->publish(
$msg->subject,
new \Basis\Nats\Message\Payload($msg->payload->body, [
'Nats-Msg-ID' => $id,
'x-attempt' => $msg->payload->getHeader('x-attempt') + 1,
])
);
}
Ну и самое интересное - обработка сообщений. Начнем с одного сообщения, а далее перейдем к прослушиванию очереди и обработке бесконечного количества сообщений:
function handle_message(string $id, string $payload, int $attempt): bool
{
echo "Обработка задачи ID {$id}, номер попытки: {$attempt}, тело: {$payload}\n";
if ($payload === 'Hello World!' && $attempt === 1) {
// Hello world - задача тяжелая, с первого раза никогда не получается
return false;
}
return true;
}
И код обработки сообщения из очереди:
const MAX_ATTEMPTS = 3;
function process_message(\Basis\Nats\Client $client, \Basis\Nats\Message\Msg $message): void
{
$payload = $message->payload;
$id = $payload->getHeader('Nats-Msg-ID');
$attempt = (int)$payload->getHeader('x-attempt');
try {
if (handle_message($id, $payload->body, $attempt)) {
$message->ack();
} else {
if ($attempt < MAX_ATTEMPTS) {
redeliver($client, $message);
}
$message->ack();
}
} catch (Throwable $e) {
if ($attempt < MAX_ATTEMPTS) {
redeliver($client, $message);
}
$message->ack();
throw $e;
}
}
Код обработки достаточно простой и понятный:
Обрабатываем сообщение
В случае ошибки проверяем, нужно ли ретраить
Если нужно, делаем ретрай
Если не нужно, заканчиваем на этом обработку сообщения
Если вылетело исключение, пусть его обрабатывает внешний слой
А теперь самое трудное – работа с прослушиванием очереди. Для этого нам понадобится расширение ext-pcntl
, чтобы скрипт можно было завершить по CTRL+C:
// Для завершения работы при нажатии CTRL+C
$canContinue = true;
pcntl_signal(SIGINT, function () use (&$canContinue) {
$canContinue = false;
});
function listen(\Basis\Nats\Client $client, \Basis\Nats\Stream\Stream $stream): void
{
global $canContinue;
$consumer = $stream->getConsumer('my-wonderful-consumer');
$consumer->getConfiguration()
// Берем сообщения только из указанных топиков
->setSubjectFilter('php-nats-test');
$queue = $consumer->create()->getQueue();
while ($canContinue) {
$message = $queue->fetch();
if ($message && !$message->payload->isEmpty()) {
// Иногда в $message может прийти служебное сообщение о том,
// что в fetch не удалось получить сообщения (очередь пуста). Тогда payload будет пустым
try {
process_message($client, $message);
} catch (Throwable $e) {
// Максимально простая обработка исключения
echo $e->getMessage() . PHP_EOL;
}
}
// Проверяем нужно ли запустить обработчики сигнала выхода
pcntl_signal_dispatch();
}
$client->unsubscribe($queue);
}
Строка my-wonderful-consumer
- это имя консьюмера. Если запустить несколько скриптов параллельно, они все будут в одной группе и только один из них будет получать одно сообщение. Таким образом достигается горизонтальное масштабирование без дублирования обработки сообщений.
Пробуем запустить :)
$client = connect();
$stream = init_stream($client);
listen($client, $stream));
disconnect($client);
В выводе получим:
% php consumer.php
Обработка задачи ID 6802976d4347b5.08190035, номер попытки: 1, тело: Hello World!
Обработка задачи ID 6802976d434999.77061748, номер попытки: 1, тело: Hello World, again!
Обработка задачи ID 6802976d434a18.95887182, номер попытки: 1, тело: Hello World, once more!
Обработка задачи ID 6802976d43e417.25517113, номер попытки: 2, тело: Hello World!
Можно заметить. что задача Hello world! обработалась дважды, первой и последней. Это связано с переотправкой. У неё даже поменялся ID, как и должно быть :)
Итоговый код файла consumer.php
<?php
require_once 'vendor/autoload.php';
function connect(): \Basis\Nats\Client
{
$configuration = new \Basis\Nats\Configuration(
host: '127.0.0.1',
port: 4222,
);
return new \Basis\Nats\Client($configuration);
}
function disconnect(\Basis\Nats\Client $client): void
{
$client->disconnect();
}
function init_stream(\Basis\Nats\Client $client): \Basis\Nats\Stream\Stream
{
$api = $client->getApi();
$stream = $api->getStream('php-nats-test');
// По умолчанию используется:
// хранилище - файловое,
// сабжекты, относящиеся к этому потоку - только имя потока.
$stream->getConfiguration()
->setRetentionPolicy(\Basis\Nats\Stream\RetentionPolicy::WORK_QUEUE);
$stream->create();
return $stream;
}
function redeliver(\Basis\Nats\Client $client, \Basis\Nats\Message\Msg $msg): void
{
$id = uniqid('', true);
$client->publish(
$msg->subject,
new \Basis\Nats\Message\Payload($msg->payload->body, [
'Nats-Msg-ID' => $id,
'x-attempt' => $msg->payload->getHeader('x-attempt') + 1,
])
);
}
function handle_message(string $id, string $payload, int $attempt): bool
{
echo "Обработка задачи ID {$id}, номер попытки: {$attempt}, тело: {$payload}\n";
if ($payload === 'Hello World!' && $attempt === 1) {
// Hello world - задача тяжелая, с первого раза никогда не получается
return false;
}
return true;
}
const MAX_ATTEMPTS = 3;
function process_message(\Basis\Nats\Client $client, \Basis\Nats\Message\Msg $message): void
{
$payload = $message->payload;
$id = $payload->getHeader('Nats-Msg-ID');
$attempt = (int)$payload->getHeader('x-attempt');
try {
if (handle_message($id, $payload->body, $attempt)) {
$message->ack();
} else {
if ($attempt < MAX_ATTEMPTS) {
redeliver($client, $message);
}
$message->ack();
}
} catch (Throwable $e) {
if ($attempt < MAX_ATTEMPTS) {
redeliver($client, $message);
}
$message->ack();
throw $e;
}
}
// Для завершения работы при нажатии CTRL+C
$canContinue = true;
pcntl_signal(SIGINT, function () use (&$canContinue) {
$canContinue = false;
});
function listen(\Basis\Nats\Client $client, \Basis\Nats\Stream\Stream $stream): void
{
global $canContinue;
$consumer = $stream->getConsumer('my-wonderful-consumer');
$consumer->getConfiguration()
// Берем сообщения только из указанных топиков
->setSubjectFilter('php-nats-test');
$queue = $consumer->create()->getQueue();
while ($canContinue) {
$message = $queue->fetch();
if ($message && !$message->payload->isEmpty()) {
// Иногда в $message может прийти служебное сообщение о том,
// что в fetch не удалось получить сообщения (очередь пуста). Тогда payload будет пустым
try {
process_message($client, $message);
} catch (Throwable $e) {
// Максимально простая обработка исключения
echo $e->getMessage() . PHP_EOL;
}
}
pcntl_signal_dispatch();
}
$client->unsubscribe($queue);
}
$client = connect();
$stream = init_stream($client);
listen($client, $stream);
disconnect($client);
Выводы
Мы попробовали поработать с Nats в PHP, реализовали базовую очередь и поняли как работают базовые сущности JetStream. Дальше только больше и круче. Например, можно внедрить отложенную обработку, приоритеты (которых в nats к сожалению нет).
На данный момент внедрение Nats является нетривиальной задачей, по которой крайне мало информации. Я надеюсь, в будущем сообщество заинтересуется этим мощным и одновременно простым в обслуживании брокером очередей, а также будет появляться больше библиотек и адаптеров, чтобы его внедрение на PHP стало проще и надежнее.
Удачи вам, коллеги :-)