Привет, Хабр!
Redis Streams давно перестали быть экзотикой для любителей CLI и стали нормальным способом гонять события между сервисами. Но у PHP есть своя специфика: один код — два способа конкурентности. Либо Amp с неблокирующим I/O и семафорами, либо Swoole с корутинами. В обоих случаях хочется одного и того же: устойчивые consumer‑группы, ручной ack, автоматический claim зависших сообщений, backpressure, экспоненциальные ретраи и внятный дед‑леттер.
Что именно строим
Задача: шина событий заказов. Продюсер пишет в orders:events через XADD с триммингом. Несколько воркеров читают из consumer‑группы orders:cg командой XREADGROUP в блокирующем режиме, подтверждают обработку через XACK, а «застрявшие» записи переезжают на активного потребителя через XAUTOCLAIM. Если событие стабильно падает — отправляем его в orders:events:dlq и больше не мучаем основной поток. Мониторим задержку группы по XINFO GROUPS и периодически чистим хвосты.
Дальше два варианта реализации: Amp и Swoole.
Схема событий и продюсер
События тонкие, поэтому в стрим кладём только факт и ключи для догрузки. Полные JSON‑мешки оставьте хранилищу. Поток обрезаем почти точно, чтобы не расти безлимитно.
<?php // producer.php $redis = new Redis(); $redis->connect('127.0.0.1', 6379); // XADD + MAXLEN ~ — почти точная обрезка, быстрее точной. Документация так и советует. // Поддерживается и XTRIM для периодической чистки. $id = $redis->xAdd('orders:events', 'MAXLEN', '~', 100_000, '*', [ 'type' => 'order.created', 'order_id' => 'o-'.bin2hex(random_bytes(8)), 'user_id' => 'u-'.bin2hex(random_bytes(6)), 'ver' => '1', ]); echo "added $id\n";
Группа потребителей и инварианты
Создаём группу с позиционированием на хвост стрима, чтобы начать с новых записей. Если стрима ещё нет — MKSTREAM.
<?php // bootstrap.php $redis = new Redis(); $redis->connect('127.0.0.1', 6379); try { // XGROUP CREATE orders:events orders:cg $ MKSTREAM $redis->xGroup('CREATE', 'orders:events', 'orders:cg', '$', true); echo "group created\n"; } catch (RedisException $e) { if (str_contains($e->getMessage(), 'BUSYGROUP')) { echo "group exists\n"; } else { throw; } }
Чтение ведём через XREADGROUP с ID > — это новые, ещё никому не доставленные записи. Результат каждого чтения попадает в Pending Entries List до XACK. Это «ровно‑однажды» не обещает, это «как минимум однажды», а значит обработчики обязаны быть идемпотентными.
Историю смотрим через XPENDING: там есть idle‑время и количество доставок. По delivery‑count решаем, когда отправлять в DLQ.
Вариант 1. Amp: неблокирующее чтение, backpressure семафором
Amp даёт неблокирующий Redis‑клиент и примитивы синхронизации. Мы читаем из группы батчами, ограничиваем параллельную обработку LocalSemaphore, подтверждаем XACK пачками. Для подбора сирот включаем параллельно цикл XAUTOCLAIM.
<?php // amp-consumer.php require __DIR__.'/vendor/autoload.php'; use function Amp\Redis\createRedisClient; use Amp\Sync\LocalSemaphore; use Revolt\EventLoop; $stream = 'orders:events'; $group = 'orders:cg'; $consumer = 'c-'.gethostname().'-'.getmypid(); // параметры backpressure $concurrency = 16; // не больше N одновременных задач $batchCount = 64; // XREADGROUP COUNT, сколько брать за раз $blockMs = 5000; // блокирующее ожидание $minIdleMs = 60_000; // порог для XAUTOCLAIM $maxDeliveries = 5; // выше — в DLQ $redis = createRedisClient('redis://127.0.0.1'); $sem = new LocalSemaphore($concurrency); // цикл автоклейма сирот EventLoop::repeat(3.0, function () use ($redis, $stream, $group, $consumer, $minIdleMs, $maxDeliveries) { $start = '0-0'; while (true) { // XAUTOCLAIM ... <minIdleMs> <start> COUNT 100 // XAUTOCLAIM увеличивает счётчик попыток, если не JUSTID. $res = $redis->xautoclaim($stream, $group, $consumer, $minIdleMs, $start, count: 100, justid: false); $start = $res['next'] ?? '0-0'; $entries = $res['messages'] ?? []; foreach ($entries as $id => $fields) { // проверим delivery-count через XPENDING (расширенная форма) $pend = $redis->xpending($stream, $group, $id, $id, 1); $deliveryCount = $pend[0][3] ?? 1; if ($deliveryCount > $maxDeliveries) { // дед-леттер: переносим и ack $redis->xAdd('orders:events:dlq', '*', [ 'orig_id' => $id, 'reason' => 'max-deliveries', 'payload' => json_encode($fields, JSON_UNESCAPED_UNICODE), ]); $redis->xAck($stream, $group, [$id]); } } if ($start === '0-0' || empty($entries)) break; } }); // основной цикл чтения while (true) { // XREADGROUP GROUP <g> <c> COUNT <N> BLOCK <ms> STREAMS <key> > $batch = $redis->xreadgroup($group, $consumer, [$stream => '>'], $batchCount, $blockMs); if (!$batch || !isset($batch[$stream])) { continue; } foreach ($batch[$stream] as $id => $fields) { $lock = $sem->acquire(); Amp\async(function () use ($redis, $stream, $group, $id, $fields, $lock) { try { // идемпотентный handler processOrderEvent($fields); // ваша доменная логика $redis->xAck($stream, $group, [$id]); // подтверждаем только после успеха } catch (\Throwable $e) { // Не ack — оставляем в PEL для повторной доставки/claim } finally { $lock->release(); } }); } } function processOrderEvent(array $fields): void { // Эмулируем «безопасный» парсинг $type = $fields['type'] ?? ''; $orderId = $fields['order_id'] ?? ''; if ($type === 'order.created' && $orderId !== '') { // ... проводим запись в БД, операции должны быть идемпотентны по order_id } }
XREADGROUP блокирует ожидание новых записей и кладёт доставленное в PEL до XACK. XPENDING в расширенной форме возвращает массив с idle и количеством доставок, что и используем для дед‑леттера. XAUTOCLAIM перевешивает владение сообщением и увеличивает счётчик доставок, что позволяет накапливать статистику ретраев. Семафор Amp ограничивает число параллельно работающих задач и тем самым создаёт backpressure на уровне приложения.
В актуальной версии amphp/redis команды стримов доступны как методы в нижнем регистре. Если версия не знает конкретный метод, отсылайте сырые команды через универсальный вызов клиента.
Вариант 2. Swoole: корутины, канал как семафор, Redis из ext-phpredis
OpenSwoole умеет перевращать многие I/O‑библиотеки в к��рутинные, в том числе Redis, а канал даёт простой способ ограничить параллелизм. Этот путь часто выбирают, если уже используете Swoole или у вас воркеры с долгими CPU‑участками.
<?php // swoole-consumer.php OpenSwoole\Runtime::enableCoroutine(); // хук для php_stream, в т.ч. Redis $stream = 'orders:events'; $group = 'orders:cg'; $consumer = 'c-'.gethostname().'-'.getmypid(); $redis = new Redis(); $redis->connect('127.0.0.1', 6379); // лимит конкурентной обработки $parallel = 32; $chan = new Swoole\Coroutine\Channel($parallel); // периодический автоклейм Swoole\Coroutine::create(function () use ($redis, $stream, $group, $consumer) { $minIdleMs = 60_000; while (true) { $start = '0-0'; do { $res = $redis->xAutoClaim($stream, $group, $consumer, $minIdleMs, $start, 100); $start = $res[0] ?? '0-0'; $msgs = $res[1] ?? []; foreach ($msgs as $m) { $id = $m[0]; $pend = $redis->xPending($stream, $group, $id, $id, 1); $deliveries = $pend[0][3] ?? 1; if ($deliveries > 5) { $redis->xAdd('orders:events:dlq', '*', [ 'orig_id' => $id, 'reason' => 'max-deliveries', 'payload' => json_encode($m[1], JSON_UNESCAPED_UNICODE), ]); $redis->xAck($stream, $group, [$id]); } } } while (!empty($msgs) && $start !== '0-0'); Swoole\Coroutine::sleep(3); } }); while (true) { // блокирующий XREADGROUP $batch = $redis->xReadGroup($group, $consumer, [$stream => '>'], 64, 5000); if (!isset($batch[$stream])) continue; foreach ($batch[$stream] as $id => $fields) { $chan->push(1); // бронируем слот Swoole\Coroutine::create(function () use ($redis, $chan, $stream, $group, $id, $fields) { try { processOrderEvent($fields); $redis->xAck($stream, $group, [$id]); } finally { $chan->pop(); // освобождаем слот } }); } } function processOrderEvent(array $fields): void { // доменная логика, та же, что и в Amp-варианте }
ext‑phpredis даёт нативные методы xReadGroup, xAck, xAutoClaim, xPending, xGroup. Включа��м корутины рантаймом OpenSwoole, ограничиваем параллелизм размером канала и получаем предсказуемую нагрузку на БД и внешние API.
Экспоненциальные ретраи без таймерных колес
Streams не содержат отложенных сообщений, поэтому для backoff есть рабочих паттерна.
«Пассивный» backoff на PEL: не ack, XAUTOCLAIM подберёт запись по
min-idle-ms. Грубовато, но просто. Счётчик попыток виден через XPENDING, XCLAIM/XAUTOCLAIM увеличивают его автоматически.«Активный» delay‑queue: при ошибке вы ack‑аете задачу и кладёте её в ZSET
orders:retryсо score равнымnow + backoffMs. Отдельная корутина/фибра периодически делаетZRANGEBYSCORE <= now LIMIT 100, вынимает задачи и пере‑XADD«ит в основной стрим сattempt=N+1.
Пример простого delay‑пуша:
function scheduleRetry(Redis $r, string $stream, array $payload, int $attempt): void { $base = 1000; // 1s $max = 60_000; // 60s $jitter = random_int(0, 250); $delay = min($base * (2 ** ($attempt - 1)) + $jitter, $max); $runAt = (int)(microtime(true) * 1000) + $delay; $r->zAdd('orders:retry', $runAt, json_encode([ 'stream' => $stream, 'payload' => $payload, 'attempt' => $attempt, ], JSON_UNESCAPED_UNICODE)); }
Отдельный воркер:
function retryPump(Redis $r): void { while (true) { $now = (int)(microtime(true) * 1000); $items = $r->zRangeByScore('orders:retry', 0, $now, ['limit' => [0, 100]]); if (!$items) { usleep(200_000); continue; } foreach ($items as $raw) { $r->multi(); $r->zRem('orders:retry', $raw); $job = json_decode($raw, true, 512, JSON_THROW_ON_ERROR); $r->xAdd($job['stream'], '*', [ ...$job['payload'], 'attempt' => (string)$job['attempt'], ]); $r->exec(); } } }
Redis нативно не умеет отложенные задачи, рекомендую для этого ZSET с временным score.
Мониторинг
Для принятия решений по масштабированию нужна метрика lag. Возьмём XINFO GROUPS: там есть lag — количество записей, которые ещё не доставлены группе. В некоторых ситуациях она может быть недоступна и вернётся nil, это нормально и задокументировано. Сохранять тренд по lag и по PEL имеет смысл отдельным метриком.
Мини‑функции:
function groupLag(Redis $r, string $stream, string $group): ?int { $groups = $r->xInfo('GROUPS', $stream); foreach ($groups as $g) { if (($g['name'] ?? '') === $group) { return $g['lag'] ?? null; // может быть null } } return null; } function pendingSummary(Redis $r, string $stream, string $group): array { // XPENDING summary: [minId, maxId, total, perConsumer...] return $r->xPending($stream, $group); }
Скейлим количество воркеров, если лаг стабильно растёт на горизонте N минут. Если lag «пилит» вокруг нуля, можно уменьшить число воркеров.
Тест перегрузки и восстановление
Быстрый сценарий: продюсер кладёт 50k событий за короткое время, потребитель ограничен семафором/каналом на 16–32 слота, обработка каждой записи занимает условно 50–100 мс.
1. На старте лаг пойдёт вверх — это нормально.
2. При устойчивом потреблении и отсутствии ошибок лаг будет сходиться к нулю.
3. Если остановить половину воркеров, лаг вырастет; при возврате воркеров — сойдёт.
4. Если включить искусственные ошибки, увидите рост delivery‑count в XPENDING и набивающийся DLQ.
Адекватная эксплуатация
Идемпотентность. Обрабатываем запись повторно без побочек. Минимальный гард — ставим идемпотентный ключ в Redis до начала сайд‑эффектов и снимаем его по TTL. Шаблон через SET key NX EX работает быстро и атомарно в рамках одной ноды. Ключ строим по бизнес‑идентификатору, не по stream‑ID. Пример:
// идемпотентность по паре {type, order_id} $key = sprintf('idem:%s:%s', $fields['type'] ?? '', $fields['order_id'] ?? ''); $ok = $redis->set($key, '1', ['nx', 'ex' => 3600]); // 1 час хватает большинству обработок if ($ok !== true) { return; // уже обрабатывали, выходим тихо } // дальше вызываем внешние API/DB и только затем делаем XACK
Если хотите «жёсткий» дедуп, можно вместо TTL хранить маркер навсегда, но это уже эксплуатационный компромисс.
Размер записей. В стрим кладём короткие факты и идентификаторы. Для роста ограничиваем ключ через MAXLEN прямо на XADD, включаем «почти точную» обрезку ~. Это убирает лишние аллокации и держит ключ компактным. Если нужно чистить по времени, периодически гоняем XTRIM MINID с порогом.
Таймауты. XREADGROUP используем с BLOCK в пределах секунд, не минут. Это нормальный режим и он штатно поддержан. На клиенте ставим разумные readTimeout и connectTimeout, иначе зависший сетевой хоп повисит воркер. В phpredis это опции конструктора или setOption. Пример:
$redis = new Redis(); $redis->connect('127.0.0.1', 6379, 2.5, null, 2.5); // connect/read timeouts ~2.5s $redis->setOption(Redis::OPT_READ_TIMEOUT, 5.0);
Если поставите NOACK, надёжности не будет, это осознанный выбор.
Тримминг. Держим размер ключа под контролем двумя способами. На входе — XADD ... MAXLEN ~ <cap> для дешёвой обрезки. Периодически — XTRIM MAXLEN или XTRIM MINID, если нужен порог по ID. Для больших потоков это мастхев.
Диагностика. XPENDING в «summary» показывает общий объём PEL и min/max ID, а «расширенная» форма по диапазону отдаёт записи вместе с idle и delivery‑count. На этом и строится принятие решений по ретраям и DLQ. Для масштабирования следим за lag из XINFO GROUPS — это разница между «всего добавлено» и «логически прочитано группой».
Claim. Предпочтительно XAUTOCLAIM с min-idle-ms: он совмещает XPENDING + XCLAIM и работает скан‑подобно, без ручной пагинации по диапазонам. Плюс гораздо меньше кода на вашей стороне. Помним ограничение XCLAIM: записей, которых уже нет в PEL или которые вычистили из стрима, он не перетянет.
Порядок действий. Без фанатизма, но строго: проверили идемпотентность, сделали внешние операции, зафиксировали их, потом XACK. Иначе поймаете «подтвердили, потом упали». Э
Акуратное выключение. На SIGTERM перестаём вызывать XREADGROUP, дожидаемся текущих хендлеров и отправляем XACK тем, кто завершился. Остальное останется в PEL и будет подобрано XAUTOCLAIM по таймауту простоя.
Настройка лаг‑алертов. Если lag устойчиво растёт — добавляем консьюмеров. Если «пилит» около нуля — можно убирать.
Итоги
В конечном счёте у нас в руках инструмент для устойчивых потоков событий в PHP. Все это применимо везде, где много коротких фактов и побочных эффектов: шина заказов и статусов, фулфилмент/логистика, биллинг и антифрод, рассылки и пуш‑уведомления, приём и обработка вебхуков, интеграции с внешними API с ретраями и джиттером, CRM/ERP‑синхронизации, ETL‑помпы/дедуп,телеметрия и алерты.
Делитесь в комментариях своими кейсами.
Если вы работаете с PHP и сталкиваетесь с задачами, которые выходят за рамки простых веб‑страниц, обратите внимание на курс PHP Developer. Professional. В программе — современные подходы к конкурентности, работа с асинхронными инструментами, устойчивые архитектуры и интеграции с промышленными сервисами. Пройдите бесплатное тестирование по курсу, чтобы узнать, подойдет ли вам курс.
А тем, кто настроен на серьезное системное обучение, рекомендуем рассмотреть Подписку — выбираете курсы под свои задачи, экономите на обучении, получаете профессиональный рост. Узнать подробнее
