Как стать автором
Обновить

Работаем с Nats в PHP

Уровень сложностиПростой
Время на прочтение10 мин
Количество просмотров657

Сегодня затронем такую неожиданную тему как работу с брокером сообщений Nats и PHP. Как оказалось, есть очень мало статей на эту тему, что странно, ведь PHP - это лучший язык программирования. Не знаю, почему так вышло, но напишите в комменты :-)

Немного про Nats

Nats – это написанный на Go высокопроизводительный брокер сообщений, работающий по принципу, схожему с Kafka (если рассматривать JetStream, а не Nats Core). Он использует PUB/SUB модель и топики для разделения, кто, что и куда отправляет и кто, что и откуда получает.

Проткол Nats

Протокол Nats – это просто текстовый протокол на базе TCP/IP, сообщения разделяются новой строкой. Можно даже работать с ним с помощью telnet.

Попробуем подключиться к nats с помощью telnet и поздороваться.

Запускаем nats в Docker

docker run --rm -it -p 4222:4222 nats

и теперь в другом окне терминала можно подключиться:

telnet 127.0.0.1 4222

При подключении мы сразу же получим приветственное сообщение сервера (INFO):

Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
INFO {"server_id":"ND6UPRIAIQ4K4EOMCQFI7NEGEEN7NW7CQCELNB3F2IVK3RGKHCA3UC36","server_name":"ND6UPRIAIQ4K4EOMCQFI7NEGEEN7NW7CQCELNB3F2IVK3RGKHCA3UC36","version":"2.11.1-binary","proto":1,"git_commit":"6cebef9","go":"go1.24.1","host":"0.0.0.0","port":4222,"headers":true,"max_payload":1048576,"client_id":5,"client_ip":"192.168.215.1","cluster":"my_cluster","xkey":"XCTTMXJ3T65TODSZEAHRFDMDUIMH2BP4SARGLKSFYRYOLH3MIQTUODC2"}

Нужно ему ответить сообщением:

CONNECT {}

Он ответит:

+OK

После этого можно отправлять команды. Например, PING

PING

Сервер ответит

PONG

Если у кого-нибудь возникнет желание разобраться поподробнее с тем, как это работает, есть неплохая документацию с демо протокола https://docs.nats.io/reference/reference-protocols/nats-protocol-demo

Нужно отметить, что Nats Core – это обычная почти что синхронная очередь. То, что публикуется в топик, должен забрать consumer. Если консьюмеров нет, сообщение уничтожается.

Для реализации очереди в нашем обычном понимании есть Nats JetStream, который реализует хранилище сообщений на сервере, возможность консьюмерам отключаться и подключаться, а также гарантии доставки.

Протокол JetStream уже сильно сложнее, работать с ним через telnet - больная боль, так что лучше брать библиотеки или хотя бы nats-box

Библиотеки на PHP

Начнем с того, что библиотек, реализующих работу с брокером, не так много. Я нашел вообще всего две:

и только первая поддерживается, так что в рамках данной статьи рассмотрим внедрение с помощью библиотеки nats.php.

Работать с этой библиотекой достаточно удобно, но не хватает более подробной документации и логирования, помимо debug логов. Но ничего, прорвемся :-)

Для дальнейшей работы примем, что мы установили библиотеку

composer require basis-company/nats

А также подключили composer autoload. Например, можно в начале файла указать:

<?php
require_once 'vendor/autoload.php';

Пробуем работать с библиотекой

Для простоты рассказа будем использовать обычный function style PHP с разделением кода на функции для хорошего понимания.
Реализуем подключение и отключение

function connect(): \Basis\Nats\Client  
{  
    $configuration = new \Basis\Nats\Configuration(  
        host: '127.0.0.1',  
        port: 4222,  
    );  
  
    return new \Basis\Nats\Client($configuration);  
}  
  
function disconnect(\Basis\Nats\Client $client): void  
{  
    $client->disconnect();  
}

Библиотека поддерживает много разных способов аутентификации. Но в нашем примере - никакой безопасности :)

Для создания потока и обращения к нему будем использовать следующую функцию

function init_stream(\Basis\Nats\Client $client): \Basis\Nats\Stream\Stream  
{  
    $api = $client->getApi();  
    $stream = $api->getStream('php-nats-test');  
    // По умолчанию используется:  
    // хранилище - файловое,    
    // сабжекты, относящиеся к этому потоку - только имя потока.    
    $stream->getConfiguration()  
        ->setRetentionPolicy(\Basis\Nats\Stream\RetentionPolicy::WORK_QUEUE);  
    $stream->create();  
  
    return $stream;  
}

Функция init_stream создает поток, который работает как классическая очередь (WORK_QUEUE). Если задача взята и ack-нута, сообщение удаляется. При этом, накладывается ограничение: на один топик - одна группа консьюмеров (консьюмеры с одним name), так реализуется гарантия доставки exactly once.

Более подробно можно посмотреть в документации Nats по JetStream Streams: https://docs.nats.io/nats-concepts/jetstream/streams

В качестве задач будем рассматривать обычные строки. В перспективе можно в эти строки записывать любые значения, например, JSON закодированные структуры данных.
Код добавления задач в поток:

function push_message(\Basis\Nats\Client $client, string $message): string  
{  
    $id = uniqid('', true);  
    $client->publish(  
        'php-nats-test',  
        new \Basis\Nats\Message\Payload($message, [  
            // Это поле также используется для дедупликации, но также полезно и для отображение логов в системе  
            'Nats-Msg-ID' => $id,  
            // Номер попытки обработки сообщения  
            'x-attempt' => 1,  
        ]),  
    );  
    return $id;  
}

При отправке задачи генерируется её ID. При получении сообщения этот ID также придет в заголовках, что может быть полезно для дальнейшей обработки. Смысл заголовка x-attempt разберем немного позже.

На текущий момент, если собрать весь код, мы сможем отправлять задачи в очередь.
Для начала запустим Nats в локальном докере с включенным jetstream:

docker run --rm -it -p 4222:4222 nats -js

Код запуска:

$client = connect();
init_stream($client);
  
$id = push_message($client, 'Hello World!');  
echo 'pushed message id: ' . $id . PHP_EOL;  
$id = push_message($client, 'Hello World, again!');  
echo 'pushed message id: ' . $id . PHP_EOL;  
$id = push_message($client, 'Hello World, once more!');  
echo 'pushed message id: ' . $id . PHP_EOL;  
  
disconnect($client);

Пробуем запустить и получаем логи:

% php producer.php
pushed message id: 68028fce39f239.23978889
pushed message id: 68028fce3b55a9.67959494
pushed message id: 68028fce3b56d1.07523175
Итоговый код файла producer.php
<?php

require_once 'vendor/autoload.php';

function connect(): \Basis\Nats\Client
{
    $configuration = new \Basis\Nats\Configuration(
        host: '127.0.0.1',
        port: 4222,
    );

    return new \Basis\Nats\Client($configuration);
}

function disconnect(\Basis\Nats\Client $client): void
{
    $client->disconnect();
}

function init_stream(\Basis\Nats\Client $client): \Basis\Nats\Stream\Stream
{
    $api = $client->getApi();
    $stream = $api->getStream('php-nats-test');
    // По умолчанию используется:
    // хранилище - файловое,
    // сабжекты, относящиеся к этому потоку - только имя потока.
    $stream->getConfiguration()
        ->setRetentionPolicy(\Basis\Nats\Stream\RetentionPolicy::WORK_QUEUE);
    $stream->create();

    return $stream;
}

function push_message(\Basis\Nats\Client $client, string $message): string
{
    $id = uniqid('', true);
    $client->publish(
        'php-nats-test',
        new \Basis\Nats\Message\Payload($message, [
            // Это поле также используется для дедупликации, но также полезно и для отображение логов в системе
            'Nats-Msg-ID' => $id,
            // Номер попытки обработки сообщения
            'x-attempt' => 1,
        ]),
    );
    return $id;
}


$client = connect();

init_stream($client);
$id = push_message($client, 'Hello World!');
echo 'pushed message id: ' . $id . PHP_EOL;
$id = push_message($client, 'Hello World, again!');
echo 'pushed message id: ' . $id . PHP_EOL;
$id = push_message($client, 'Hello World, once more!');
echo 'pushed message id: ' . $id . PHP_EOL;

disconnect($client);

Далее переходим к следующему этапу. Нам ведь нужно эти сообщения забрать!

Потребление сообщений из очереди

Вернемся к вопросу реализации ретраев. В целом, для её реализации есть два варианта:

  • Штатный ограничитель в настройках консьюмера "доставлять мне одну задачу не более X раз, если она nack-нута", при этом при исчерпании попыток, задача из очереди удалена не будет

  • Кастомная логика на основе заголовков и обратной отправки задачи в конец очереди (спойлер: использовать будем именно его)

    Самый очевидный способ следующий:

    • Берем задачу

    • Пытаемся обработать

    • Получаем ошибку

    • Отправляем задачу в очередь с attempt = attempt + 1

      • Проверяем, если attempt === MAX_ATTEMPTS, не отправляем задачу в очередь

Код функции переотправки задачи в очередь:

function redeliver(\Basis\Nats\Client $client, \Basis\Nats\Message\Msg $msg): void  
{  
    $id = uniqid('', true);  
    $client->publish(  
        $msg->subject,  
        new \Basis\Nats\Message\Payload($msg->payload->body, [  
            'Nats-Msg-ID' => $id,  
            'x-attempt' => $msg->payload->getHeader('x-attempt') + 1,  
        ])  
    );  
}

Ну и самое интересное - обработка сообщений. Начнем с одного сообщения, а далее перейдем к прослушиванию очереди и обработке бесконечного количества сообщений:

function handle_message(string $id, string $payload, int $attempt): bool  
{  
    echo "Обработка задачи ID {$id}, номер попытки: {$attempt}, тело: {$payload}\n";  
  
    if ($payload === 'Hello World!' && $attempt === 1) {  
        // Hello world - задача тяжелая, с первого раза никогда не получается  
        return false;  
    }  
  
    return true;  
}

И код обработки сообщения из очереди:

const MAX_ATTEMPTS = 3;  
function process_message(\Basis\Nats\Client $client, \Basis\Nats\Message\Msg $message): void  
{  
    $payload = $message->payload;  
  
    $id = $payload->getHeader('Nats-Msg-ID');  
    $attempt = (int)$payload->getHeader('x-attempt');  
    try {  
        if (handle_message($id, $payload->body, $attempt)) {  
            $message->ack();  
        } else {  
            if ($attempt < MAX_ATTEMPTS) {  
                redeliver($client, $message);  
            }  
            $message->ack();  
        }  
    } catch (Throwable $e) {  
        if ($attempt < MAX_ATTEMPTS) {  
            redeliver($client, $message);  
        }  
        $message->ack();  
        throw $e;  
    }  
}

Код обработки достаточно простой и понятный:

  • Обрабатываем сообщение

  • В случае ошибки проверяем, нужно ли ретраить

    • Если нужно, делаем ретрай

    • Если не нужно, заканчиваем на этом обработку сообщения

    • Если вылетело исключение, пусть его обрабатывает внешний слой

А теперь самое трудное – работа с прослушиванием очереди. Для этого нам понадобится расширение ext-pcntl, чтобы скрипт можно было завершить по CTRL+C:

// Для завершения работы при нажатии CTRL+C  
$canContinue = true;  
pcntl_signal(SIGINT, function () use (&$canContinue) {  
    $canContinue = false;  
});  
  
function listen(\Basis\Nats\Client $client, \Basis\Nats\Stream\Stream $stream): void  
{  
    global $canContinue;  
  
    $consumer = $stream->getConsumer('my-wonderful-consumer');  
    $consumer->getConfiguration()  
        // Берем сообщения только из указанных топиков  
        ->setSubjectFilter('php-nats-test');  
    $queue = $consumer->create()->getQueue();  
  
    while ($canContinue) {  
        $message = $queue->fetch();  
        if ($message && !$message->payload->isEmpty()) {  
            // Иногда в $message может прийти служебное сообщение о том,  
            // что в fetch не удалось получить сообщения (очередь пуста). Тогда payload будет пустым            
            try {  
                process_message($client, $message);  
            } catch (Throwable $e) {  
                // Максимально простая обработка исключения  
                echo $e->getMessage() . PHP_EOL;  
            }  
        }  
		// Проверяем нужно ли запустить обработчики сигнала выхода
        pcntl_signal_dispatch();  
    }  
  
    $client->unsubscribe($queue);  
}

Строка my-wonderful-consumer - это имя консьюмера. Если запустить несколько скриптов параллельно, они все будут в одной группе и только один из них будет получать одно сообщение. Таким образом достигается горизонтальное масштабирование без дублирования обработки сообщений.

Пробуем запустить :)

$client = connect();  
$stream = init_stream($client);  
  
listen($client, $stream));  
  
disconnect($client);

В выводе получим:

% php consumer.php
Обработка задачи ID 6802976d4347b5.08190035, номер попытки: 1, тело: Hello World!
Обработка задачи ID 6802976d434999.77061748, номер попытки: 1, тело: Hello World, again!
Обработка задачи ID 6802976d434a18.95887182, номер попытки: 1, тело: Hello World, once more!
Обработка задачи ID 6802976d43e417.25517113, номер попытки: 2, тело: Hello World!

Можно заметить. что задача Hello world! обработалась дважды, первой и последней. Это связано с переотправкой. У неё даже поменялся ID, как и должно быть :)

Итоговый код файла consumer.php
<?php

require_once 'vendor/autoload.php';

function connect(): \Basis\Nats\Client
{
    $configuration = new \Basis\Nats\Configuration(
        host: '127.0.0.1',
        port: 4222,
    );

    return new \Basis\Nats\Client($configuration);
}

function disconnect(\Basis\Nats\Client $client): void
{
    $client->disconnect();
}

function init_stream(\Basis\Nats\Client $client): \Basis\Nats\Stream\Stream
{
    $api = $client->getApi();
    $stream = $api->getStream('php-nats-test');
    // По умолчанию используется:
    // хранилище - файловое,
    // сабжекты, относящиеся к этому потоку - только имя потока.
    $stream->getConfiguration()
        ->setRetentionPolicy(\Basis\Nats\Stream\RetentionPolicy::WORK_QUEUE);
    $stream->create();

    return $stream;
}

function redeliver(\Basis\Nats\Client $client, \Basis\Nats\Message\Msg $msg): void
{
    $id = uniqid('', true);
    $client->publish(
        $msg->subject,
        new \Basis\Nats\Message\Payload($msg->payload->body, [
            'Nats-Msg-ID' => $id,
            'x-attempt' => $msg->payload->getHeader('x-attempt') + 1,
        ])
    );
}

function handle_message(string $id, string $payload, int $attempt): bool
{
    echo "Обработка задачи ID {$id}, номер попытки: {$attempt}, тело: {$payload}\n";

    if ($payload === 'Hello World!' && $attempt === 1) {
        // Hello world - задача тяжелая, с первого раза никогда не получается
        return false;
    }

    return true;
}

const MAX_ATTEMPTS = 3;
function process_message(\Basis\Nats\Client $client, \Basis\Nats\Message\Msg $message): void
{
    $payload = $message->payload;

    $id = $payload->getHeader('Nats-Msg-ID');
    $attempt = (int)$payload->getHeader('x-attempt');
    try {
        if (handle_message($id, $payload->body, $attempt)) {
            $message->ack();
        } else {
            if ($attempt < MAX_ATTEMPTS) {
                redeliver($client, $message);
            }
            $message->ack();
        }
    } catch (Throwable $e) {
        if ($attempt < MAX_ATTEMPTS) {
            redeliver($client, $message);
        }
        $message->ack();
        throw $e;
    }
}


// Для завершения работы при нажатии CTRL+C
$canContinue = true;
pcntl_signal(SIGINT, function () use (&$canContinue) {
    $canContinue = false;
});

function listen(\Basis\Nats\Client $client, \Basis\Nats\Stream\Stream $stream): void
{
    global $canContinue;

    $consumer = $stream->getConsumer('my-wonderful-consumer');
    $consumer->getConfiguration()
        // Берем сообщения только из указанных топиков
        ->setSubjectFilter('php-nats-test');
    $queue = $consumer->create()->getQueue();

    while ($canContinue) {
        $message = $queue->fetch();
        if ($message && !$message->payload->isEmpty()) {
            // Иногда в $message может прийти служебное сообщение о том,
            // что в fetch не удалось получить сообщения (очередь пуста). Тогда payload будет пустым
            try {
                process_message($client, $message);
            } catch (Throwable $e) {
                // Максимально простая обработка исключения
                echo $e->getMessage() . PHP_EOL;
            }
        }

        pcntl_signal_dispatch();
    }

    $client->unsubscribe($queue);
}


$client = connect();
$stream = init_stream($client);

listen($client, $stream);

disconnect($client);

Выводы

Мы попробовали поработать с Nats в PHP, реализовали базовую очередь и поняли как работают базовые сущности JetStream. Дальше только больше и круче. Например, можно внедрить отложенную обработку, приоритеты (которых в nats к сожалению нет).

На данный момент внедрение Nats является нетривиальной задачей, по которой крайне мало информации. Я надеюсь, в будущем сообщество заинтересуется этим мощным и одновременно простым в обслуживании брокером очередей, а также будет появляться больше библиотек и адаптеров, чтобы его внедрение на PHP стало проще и надежнее.

Удачи вам, коллеги :-)

Теги:
Хабы:
+3
Комментарии2

Публикации

Работа

PHP программист
82 вакансии

Ближайшие события