Привет, Хабр!

Redis Streams давно перестали быть экзотикой для любителей CLI и стали нормальным способом гонять события между сервисами. Но у PHP есть своя специфика: один код — два способа конкурентности. Либо Amp с неблокирующим I/O и семафорами, либо Swoole с корутинами. В обоих случаях хочется одного и того же: устойчивые consumer‑группы, ручной ack, автоматический claim зависших сообщений, backpressure, экспоненциальные ретраи и внятный дед‑леттер.

Что именно строим

Задача: шина событий заказов. Продюсер пишет в orders:events через XADD с триммингом. Несколько воркеров читают из consumer‑группы orders:cg командой XREADGROUP в блокирующем режиме, подтверждают обработку через XACK, а «застрявшие» записи переезжают на активного потребителя через XAUTOCLAIM. Если событие стабильно падает — отправляем его в orders:events:dlq и больше не мучаем основной поток. Мониторим задержку группы по XINFO GROUPS и периодически чистим хвосты.

Дальше два варианта реализации: Amp и Swoole.

Схема событий и продюсер

События тонкие, поэтому в стрим кладём только факт и ключи для догрузки. Полные JSON‑мешки оставьте хранилищу. Поток обрезаем почти точно, чтобы не расти безлимитно.

<?php
// producer.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// XADD + MAXLEN ~ — почти точная обрезка, быстрее точной. Документация так и советует. 
// Поддерживается и XTRIM для периодической чистки.
$id = $redis->xAdd('orders:events', 'MAXLEN', '~', 100_000, '*', [
    'type' => 'order.created',
    'order_id' => 'o-'.bin2hex(random_bytes(8)),
    'user_id'  => 'u-'.bin2hex(random_bytes(6)),
    'ver' => '1',
]);
echo "added $id\n";

Группа потребителей и инварианты

Создаём группу с позиционированием на хвост стрима, чтобы начать с новых записей. Если стрима ещё нет — MKSTREAM.

<?php
// bootstrap.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

try {
    // XGROUP CREATE orders:events orders:cg $ MKSTREAM
    $redis->xGroup('CREATE', 'orders:events', 'orders:cg', '$', true);
    echo "group created\n";
} catch (RedisException $e) {
    if (str_contains($e->getMessage(), 'BUSYGROUP')) {
        echo "group exists\n";
    } else {
        throw;
    }
}

Чтение ведём через XREADGROUP с ID > — это новые, ещё никому не доставленные записи. Результат каждого чтения попадает в Pending Entries List до XACK. Это «ровно‑однажды» не обещает, это «как минимум однажды», а значит обработчики обязаны быть идемпотентными.

Историю смотрим через XPENDING: там есть idle‑время и количество доставок. По delivery‑count решаем, когда отправлять в DLQ.

Вариант 1. Amp: неблокирующее чтение, backpressure семафором

Amp даёт неблокирующий Redis‑клиент и примитивы синхронизации. Мы читаем из группы батчами, ограничиваем параллельную обработку LocalSemaphore, подтверждаем XACK пачками. Для подбора сирот включаем параллельно цикл XAUTOCLAIM.

<?php
// amp-consumer.php
require __DIR__.'/vendor/autoload.php';

use function Amp\Redis\createRedisClient;
use Amp\Sync\LocalSemaphore;
use Revolt\EventLoop;

$stream = 'orders:events';
$group  = 'orders:cg';
$consumer = 'c-'.gethostname().'-'.getmypid();

// параметры backpressure
$concurrency = 16;           // не больше N одновременных задач
$batchCount  = 64;           // XREADGROUP COUNT, сколько брать за раз
$blockMs     = 5000;         // блокирующее ожидание
$minIdleMs   = 60_000;       // порог для XAUTOCLAIM
$maxDeliveries = 5;          // выше — в DLQ

$redis = createRedisClient('redis://127.0.0.1');

$sem = new LocalSemaphore($concurrency);

// цикл автоклейма сирот
EventLoop::repeat(3.0, function () use ($redis, $stream, $group, $consumer, $minIdleMs, $maxDeliveries) {
    $start = '0-0';
    while (true) {
        // XAUTOCLAIM ... <minIdleMs> <start> COUNT 100
        // XAUTOCLAIM увеличивает счётчик попыток, если не JUSTID. 
        $res = $redis->xautoclaim($stream, $group, $consumer, $minIdleMs, $start, count: 100, justid: false);
        $start = $res['next'] ?? '0-0';
        $entries = $res['messages'] ?? [];

        foreach ($entries as $id => $fields) {
            // проверим delivery-count через XPENDING (расширенная форма)
            $pend = $redis->xpending($stream, $group, $id, $id, 1);
            $deliveryCount = $pend[0][3] ?? 1;
            if ($deliveryCount > $maxDeliveries) {
                // дед-леттер: переносим и ack
                $redis->xAdd('orders:events:dlq', '*', [
                    'orig_id' => $id,
                    'reason'  => 'max-deliveries',
                    'payload' => json_encode($fields, JSON_UNESCAPED_UNICODE),
                ]);
                $redis->xAck($stream, $group, [$id]);
            }
        }
        if ($start === '0-0' || empty($entries)) break;
    }
});

// основной цикл чтения
while (true) {
    // XREADGROUP GROUP <g> <c> COUNT <N> BLOCK <ms> STREAMS <key> >
    $batch = $redis->xreadgroup($group, $consumer, [$stream => '>'], $batchCount, $blockMs);

    if (!$batch || !isset($batch[$stream])) {
        continue;
    }

    foreach ($batch[$stream] as $id => $fields) {
        $lock = $sem->acquire();
        Amp\async(function () use ($redis, $stream, $group, $id, $fields, $lock) {
            try {
                // идемпотентный handler
                processOrderEvent($fields); // ваша доменная логика
                $redis->xAck($stream, $group, [$id]); // подтверждаем только после успеха
            } catch (\Throwable $e) {
                // Не ack — оставляем в PEL для повторной доставки/claim
            } finally {
                $lock->release();
            }
        });
    }
}

function processOrderEvent(array $fields): void {
    // Эмулируем «безопасный» парсинг
    $type = $fields['type'] ?? '';
    $orderId = $fields['order_id'] ?? '';
    if ($type === 'order.created' && $orderId !== '') {
        // ... проводим запись в БД, операции должны быть идемпотентны по order_id
    }
}

XREADGROUP блокирует ожидание новых записей и кладёт доставленное в PEL до XACK. XPENDING в расширенной форме возвращает массив с idle и количеством доставок, что и используем для дед‑леттера. XAUTOCLAIM перевешивает владение сообщением и увеличивает счётчик доставок, что позволяет накапливать статистику ретраев. Семафор Amp ограничивает число параллельно работающих задач и тем самым создаёт backpressure на уровне приложения.

В актуальной версии amphp/redis команды стримов доступны как методы в нижнем регистре. Если версия не знает конкретный метод, отсылайте сырые команды через универсальный вызов клиента.

Вариант 2. Swoole: корутины, канал как семафор, Redis из ext-phpredis

OpenSwoole умеет перевращать многие I/O‑библиотеки в корутинные, в том числе Redis, а канал даёт простой способ ограничить параллелизм. Этот путь часто выбирают, если уже используете Swoole или у вас воркеры с долгими CPU‑участками.

<?php
// swoole-consumer.php
OpenSwoole\Runtime::enableCoroutine(); // хук для php_stream, в т.ч. Redis

$stream = 'orders:events';
$group  = 'orders:cg';
$consumer = 'c-'.gethostname().'-'.getmypid();

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

// лимит конкурентной обработки
$parallel = 32;
$chan = new Swoole\Coroutine\Channel($parallel);

// периодический автоклейм
Swoole\Coroutine::create(function () use ($redis, $stream, $group, $consumer) {
    $minIdleMs = 60_000;
    while (true) {
        $start = '0-0';
        do {
            $res = $redis->xAutoClaim($stream, $group, $consumer, $minIdleMs, $start, 100);
            $start = $res[0] ?? '0-0';
            $msgs  = $res[1] ?? [];
            foreach ($msgs as $m) {
                $id = $m[0];
                $pend = $redis->xPending($stream, $group, $id, $id, 1);
                $deliveries = $pend[0][3] ?? 1;
                if ($deliveries > 5) {
                    $redis->xAdd('orders:events:dlq', '*', [
                        'orig_id' => $id,
                        'reason'  => 'max-deliveries',
                        'payload' => json_encode($m[1], JSON_UNESCAPED_UNICODE),
                    ]);
                    $redis->xAck($stream, $group, [$id]);
                }
            }
        } while (!empty($msgs) && $start !== '0-0');
        Swoole\Coroutine::sleep(3);
    }
});

while (true) {
    // блокирующий XREADGROUP
    $batch = $redis->xReadGroup($group, $consumer, [$stream => '>'], 64, 5000);
    if (!isset($batch[$stream])) continue;

    foreach ($batch[$stream] as $id => $fields) {
        $chan->push(1); // бронируем слот
        Swoole\Coroutine::create(function () use ($redis, $chan, $stream, $group, $id, $fields) {
            try {
                processOrderEvent($fields);
                $redis->xAck($stream, $group, [$id]);
            } finally {
                $chan->pop(); // освобождаем слот
            }
        });
    }
}

function processOrderEvent(array $fields): void {
    // доменная логика, та же, что и в Amp-варианте
}

ext‑phpredis даёт нативные методы xReadGroup, xAck, xAutoClaim, xPending, xGroup. Включаем корутины рантаймом OpenSwoole, ограничиваем параллелизм размером канала и получаем предсказуемую нагрузку на БД и внешние API.

Экспоненциальные ретраи без таймерных колес

Streams не содержат отложенных сообщений, поэтому для backoff есть рабочих паттерна.

  1. «Пассивный» backoff на PEL: не ack, XAUTOCLAIM подберёт запись по min-idle-ms. Грубовато, но просто. Счётчик попыток виден через XPENDING, XCLAIM/XAUTOCLAIM увеличивают его автоматически.

  2. «Активный» delay‑queue: при ошибке вы ack‑аете задачу и кладёте её в ZSET orders:retry со score равным now + backoffMs. Отдельная корутина/фибра периодически делает ZRANGEBYSCORE <= now LIMIT 100, вынимает задачи и пере‑XADD«ит в основной стрим с attempt=N+1.

Пример простого delay‑пуша:

function scheduleRetry(Redis $r, string $stream, array $payload, int $attempt): void {
    $base = 1000; // 1s
    $max  = 60_000; // 60s
    $jitter = random_int(0, 250);
    $delay = min($base * (2 ** ($attempt - 1)) + $jitter, $max);
    $runAt = (int)(microtime(true) * 1000) + $delay;

    $r->zAdd('orders:retry', $runAt, json_encode([
        'stream'  => $stream,
        'payload' => $payload,
        'attempt' => $attempt,
    ], JSON_UNESCAPED_UNICODE));
}

Отдельный воркер:

function retryPump(Redis $r): void {
    while (true) {
        $now = (int)(microtime(true) * 1000);
        $items = $r->zRangeByScore('orders:retry', 0, $now, ['limit' => [0, 100]]);
        if (!$items) { usleep(200_000); continue; }

        foreach ($items as $raw) {
            $r->multi();
            $r->zRem('orders:retry', $raw);
            $job = json_decode($raw, true, 512, JSON_THROW_ON_ERROR);
            $r->xAdd($job['stream'], '*', [
                ...$job['payload'],
                'attempt' => (string)$job['attempt'],
            ]);
            $r->exec();
        }
    }
}

Redis нативно не умеет отложенные задачи, рекомендую для этого ZSET с временным score.

Мониторинг

Для принятия решений по масштабированию нужна метрика lag. Возьмём XINFO GROUPS: там есть lag — количество записей, которые ещё не доставлены группе. В некоторых ситуациях она может быть недоступна и вернётся nil, это нормально и задокументировано. Сохранять тренд по lag и по PEL имеет смысл отдельным метриком.

Мини‑функции:

function groupLag(Redis $r, string $stream, string $group): ?int {
    $groups = $r->xInfo('GROUPS', $stream);
    foreach ($groups as $g) {
        if (($g['name'] ?? '') === $group) {
            return $g['lag'] ?? null; // может быть null
        }
    }
    return null;
}

function pendingSummary(Redis $r, string $stream, string $group): array {
    // XPENDING summary: [minId, maxId, total, perConsumer...]
    return $r->xPending($stream, $group);
}

Скейлим количество воркеров, если лаг стабильно растёт на горизонте N минут. Если lag «пилит» вокруг нуля, можно уменьшить число воркеров.

Тест перегрузки и восстановление

Быстрый сценарий: продюсер кладёт 50k событий за короткое время, потребитель ограничен семафором/каналом на 16–32 слота, обработка каждой записи занимает условно 50–100 мс.

1. На старте лаг пойдёт вверх — это нормально.

2. При устойчивом потреблении и отсутствии ошибок лаг будет сходиться к нулю.

3. Если остановить половину воркеров, лаг вырастет; при возврате воркеров — сойдёт.

4. Если включить искусственные ошибки, увидите рост delivery‑count в XPENDING и набивающийся DLQ.

Адекватная эксплуатация

Идемпотентность. Обрабатываем запись повторно без побочек. Минимальный гард — ставим идемпотентный ключ в Redis до начала сайд‑эффектов и снимаем его по TTL. Шаблон через SET key NX EX работает быстро и атомарно в рамках одной ноды. Ключ строим по бизнес‑идентификатору, не по stream‑ID. Пример:

// идемпотентность по паре {type, order_id}
$key = sprintf('idem:%s:%s', $fields['type'] ?? '', $fields['order_id'] ?? '');
$ok  = $redis->set($key, '1', ['nx', 'ex' => 3600]); // 1 час хватает большинству обработок
if ($ok !== true) {
    return; // уже обрабатывали, выходим тихо
}
// дальше вызываем внешние API/DB и только затем делаем XACK

Если хотите «жёсткий» дедуп, можно вместо TTL хранить маркер навсегда, но это уже эксплуатационный компромисс.

Размер записей. В стрим кладём короткие факты и идентификаторы. Для роста ограничиваем ключ через MAXLEN прямо на XADD, включаем «почти точную» обрезку ~. Это убирает лишние аллокации и держит ключ компактным. Если нужно чистить по времени, периодически гоняем XTRIM MINID с порогом.

Таймауты. XREADGROUP используем с BLOCK в пределах секунд, не минут. Это нормальный режим и он штатно поддержан. На клиенте ставим разумные readTimeout и connectTimeout, иначе зависший сетевой хоп повисит воркер. В phpredis это опции конструктора или setOption. Пример:

$redis = new Redis();
$redis->connect('127.0.0.1', 6379, 2.5, null, 2.5); // connect/read timeouts ~2.5s
$redis->setOption(Redis::OPT_READ_TIMEOUT, 5.0);

Если поставите NOACK, надёжности не будет, это осознанный выбор.

Тримминг. Держим размер ключа под контролем двумя способами. На входе — XADD ... MAXLEN ~ <cap> для дешёвой обрезки. Периодически — XTRIM MAXLEN или XTRIM MINID, если нужен порог по ID. Для больших потоков это мастхев.

Диагностика. XPENDING в «summary» показывает общий объём PEL и min/max ID, а «расширенная» форма по диапазону отдаёт записи вместе с idle и delivery‑count. На этом и строится принятие решений по ретраям и DLQ. Для масштабирования следим за lag из XINFO GROUPS — это разница между «всего добавлено» и «логически прочитано группой».

Claim. Предпочтительно XAUTOCLAIM с min-idle-ms: он совмещает XPENDING + XCLAIM и работает скан‑подобно, без ручной пагинации по диапазонам. Плюс гораздо меньше кода на вашей стороне. Помним ограничение XCLAIM: записей, которых уже нет в PEL или которые вычистили из стрима, он не перетянет.

Порядок действий. Без фанатизма, но строго: проверили идемпотентность, сделали внешние операции, зафиксировали их, потом XACK. Иначе поймаете «подтвердили, потом упали». Э

Акуратное выключение. На SIGTERM перестаём вызывать XREADGROUP, дожидаемся текущих хендлеров и отправляем XACK тем, кто завершился. Остальное останется в PEL и будет подобрано XAUTOCLAIM по таймауту простоя.

Настройка лаг‑алертов. Если lag устойчиво растёт — добавляем консьюмеров. Если «пилит» около нуля — можно убирать.


Итоги

В конечном счёте у нас в руках инструмент для устойчивых потоков событий в PHP. Все это применимо везде, где много коротких фактов и побочных эффектов: шина заказов и статусов, фулфилмент/логистика, биллинг и антифрод, рассылки и пуш‑уведомления, приём и обработка вебхуков, интеграции с внешними API с ретраями и джиттером, CRM/ERP‑синхронизации, ETL‑помпы/дедуп,телеметрия и алерты.

Делитесь в комментариях своими кейсами.

Если вы работаете с PHP и сталкиваетесь с задачами, которые выходят за рамки простых веб‑страниц, обратите внимание на курс PHP Developer. Professional. В программе — современные подходы к конкурентности, работа с асинхронными инструментами, устойчивые архитектуры и интеграции с промышленными сервисами. Пройдите бесплатное тестирование по курсу, чтобы узнать, подойдет ли вам курс.

А тем, кто настроен на серьезное системное обучение, рекомендуем рассмотреть Подписку — выбираете курсы под свои задачи, экономите на обучении, получаете профессиональный рост. Узнать подробнее