Для тех кому лень читать и хочется сразу потрогать вот Github.
Откуда задача
Перед командой встала типичная для высоконагруженных сервисов постановка по обработке сообщений:
Порядок обработки в рамках одной логической партиции — события по одному пользователю/сущности должны идти строго последовательно.
Очень много ключей партиций — сотни тысяч и выше; заводить под каждую ключ отдельную «очередь» в классическом смысле неразумно.
Ограниченный пул воркеров, который шарится между партициями, при этом одна тяжёлая партиция не должна блокировать остальные — нужна справедливая конкуренция за CPU/сеть.
Ретраи и отложенные сообщения — временная недоступность внешнего API, отложенный запуск, rate limit.
Ни одна готовая «одна технология» не закрывает это из коробки без компромиссов. Ниже — как мы к этому пришли и что в итоге сделали в виде библиотеки smart-redis-queue (очередь на Redis, логика в Lua-скриптах).
Что уже есть на рынке: Kafka
Apache Kafka — распределённый лог: партиции топика, репликация, долговременное хранение, высокая пропускная способность.
Плюсы для нашей задачи:
Естественная модель «партиция = порядок внутри партиции».
Масштабирование чтения за счёт consumer groups и числа партиций.
Минусы / ограничения, из-за которых «только Kafka» было недостаточно:
Consumer group привязывает партиции к консьюмерам: это не тот же самый паттерн, что динамический пул воркеров, который в один момент обрабатывает задачи из разных партиций по мере готовности. В Kafka воркер «держит» назначенные ему партиции; гибкое «взять следующую готовую задачу из любой партиции из общего пула» — другая семантика (ближе к task queue, чем к streaming log).
TTL сообщений — не то же самое, что «отложить до времени T» в смысле бизнес-планировщика; retention и compaction решают другие задачи.
Операционная и архитектурная тяжесть: кластер, мониторинг, эволюция схем — не всегда оправданы, если у вас уже есть Redis и нужна именно очередь задач, а не бесконечный лог событий.
Итог: Kafka остаётся сильным выбором для event streaming и больших пайплайнов, но для описанного сочетания «много ключей + общий пул + отложенные задачи + явные ретраи в одном сервисе» хотелось более лёгкого и управляемого решения поверх уже имеющейся инфраструктуры.
RabbitMQ
RabbitMQ — брокер с очередями, exchange, routing, DLQ, TTL.
Плюсы:
Зрелая модель, понятные паттерны pub/sub и work queues.
TTL сообщений, dead-letter — из коробки.
Минусы для строгого порядка и «миллионов ключей»:
Гарантия порядка в общем случае не совпадает с моделью «одна логическая партиция = строгая последовательность» при нескольких консьюмерах на одной очереди: сообщения распределяются между воркерами, порядок на уровне «кто первый забрал» не тот, что порядок публикации.
Завести отдельную очередь на каждый ключ партиции при сотнях тысяч ключей — плохо масштабируется по ресурсам брокера и по эксплуатации.
Паттерн «общий пул воркеров + справедливость между логическими партициями» требует либо кастомной маршрутизации, либо своего слоя поверх брокера.
Итог: RabbitMQ отлично подходит для многих сценариев, но строгий порядок в рамках логической партиции при пуле воркеров без изобретательной схемы на стороне приложения не даётся просто так.
Решение: очередь на Redis и атомарность через Lua
Redis здесь — быстрое in-memory хранилище со структурами данных (ZSET, HASH, STRING, SET).
Lua в Redis — встроенный язык сценариев. Скрипт выполняется на сервере Redis целиком, между командами нет гонок с другими клиентами: скрипт — одна атомарная единица работы (пока не завершится выполнение скрипта, другой клиент не «вклинится» в середину). Это критично для операций вида «снять с ZSET, положить в processing, обновить блокировку партиции» — без Lua пришлось бы городить WATCH/MULTI или принимать race conditions.
Коротко: Lua + Redis = один round-trip и согласованное состояние ключей без распределённых транзакций между несколькими сервисами.
Проектирование ключей и структур
Используется префикс queue:{queueName}:.
Назначение | Ключ / структура |
|---|---|
Множество «живых» партиций |
|
Очередь задач партиции с приоритетом |
|
Уровни приоритета внутри партиции |
|
Полезная нагрузка |
|
Код партиции задачи |
|
Числовой приоритет |
|
Задачи «в работе» у консьюмера |
|
Heartbeat |
|
Блокировка ordered-партиции |
|
Временный блок партиции (rate limit) |
|
Ordered-партиции — с префиксом ! в коде партиции: для них действует эксклюзивная блокировка (один консьюмер на партицию в момент времени), чтобы порядок не ломался.
Обычные партиции (без !) и базовая партиция (base) — параллельная обработка без такой блокировки.
Движение сообщения: схемы
Публикация (добавление)

Чтение (consume / Get)

Ack

Reject

Скрипт добавления (Publish)
Реализация в репозитории: scripts.go, переменная addScript.
Ключевая идея:
Для каждой задачи:
SETpayload с NX — идемпотентность поtaskId; дубликат не затирает существующую задачу.Сохраняются партиция и приоритет.
В
ZSETпартиции добавляетсяtaskIdсо score =scheduledв миллисекундах — это и есть отложенное сообщение: покаnow < score, Get его не отдаст.Партиция регистрируется в глобальном
SETпартиций.
Фрагмент логики (упрощённо):
local ok = redis.call('SET', payloadKey, payload, 'NX') if ok then redis.call('ZADD', partitionQueueKey, scheduled, taskId) redis.call('SADD', partitionsKey, partitionCode) -- ... end
Бенчмарки публикации
Запуск: go test -bench='BenchmarkPublish$|BenchmarkPublishBatch10|BenchmarkPublishBatch100' -benchmem -count=3
Окружение при замере: Linux, go1.22+, Redis localhost:6379, CPU Intel i7-13700H (20 логических CPU), пакет bench-queue, БД Redis 5.
Имеется в виду размер батча в одном вызове Publish: 1 / 10 / 100 задач (не «число партиций»).
Пропускная способность добавления в пересчёте на задачи (удобно сравнивать с лимитами бизнеса):
одиночный
Publish: задач/с ≈ 10⁹ / ns/op (одна итерация = одна задача);батч: задач/с ≈ (размер батча × 10⁹) / ns/op (одна итерация = весь батч).
Бенчмарк | Смысл | ns/op (разброс по 3 прогонам) | Задач/с (оценка по тем же прогонам) |
|---|---|---|---|
| 1 задача за вызов | ~43–60 µs | ~17–23 тысяч |
| 10 задач за вызов | ~108–123 µs на батч | ~80–92 тысяч |
| 100 задач за вызов | ~855–986 µs на батч | ~100–117 тысяч |
Вывод: батчевый Publish сильно поднимает «задач в секунду» за счёт одного round-trip Lua на много записей — при массовой загрузке выгода очевидна.
Скрипт чтения (Get / consume)
Скрипт getScript в scripts.go:
Обновляет heartbeat консьюмера.
Берёт
SMEMBERSактивных партиций и итерируется (в текущей реализации внешний цикл набирает доprefetchCountзадач, внутри — перебор партиций).Для ordered-партиций (
!) проверяется lock и опциональныйblock(cooldown после rate limit).Для выбранной партиции: приоритеты читаются по убыванию (
ZREVRANGEнаpriorities), внутри уровня — голова ZSET очереди; задача снимается только еслиscheduled <= now.Задача переносится в
HASH«задачи консьюмера» с отметкой времени.
Бенчмарки чтения
Здесь ns/op — среднее время на одну обработанную задачу (Get + Ack), т.к. в тесте крутится consumed < b.N по счётчику задач. Пропускная способность чтения одним воркером: задач/с ≈ 10⁹ / ns/op.
Бенчмарк | ns/op (разброс) | Задач/с на одного воркера (оценка) |
|---|---|---|
| ~50–61 µs | ~16–20 тысяч |
| ~80–103 µs | ~9.7–12.5 тысяч |
| ~57–67 µs | ~15–17.5 тысяч |
Prefetch уменьшает число обращений к Redis на обработанную задачу, когда это допустимо бизнес-логикой.
Несколько воркеров (BenchmarkConsumeParallel): обрабатывается общий объём b.N задач; ns/op — это реальное (wall-clock) время на всю пачку, поделённое на b.N, т.е. показатель уже суммарной скорости конвейера (сколько задач в секунду система в целом вычитала и обработала).
Подбенчмарк | ns/op (разброс) | Суммарно задач/с (вся группа воркеров) |
|---|---|---|
| ~33–36 µs | ~28–30 тысяч |
| ~38–44 µs | ~23–26 тысяч |
На восьми воркерах выигрыш не восьмерной — Redis и блокировки на стороне одного инстанса становятся узким местом; рост «горизонталью» здесь не линейный.
Ack
Скрипт ackScript:
Проверяет, что
taskIdесть вHASHзадач этого консьюмера.Уменьшает счётчик «задач в работе» по партиции; если партиция ordered и счётчик обнулился — снимает lock.
Удаляет payload, служебные ключи, запись в
HASH.
Если задача не в processing или принадлежит не этому консьюмеру — операция не выполняется (возврат 0 на стороне скрипта).
Reject и нюансы ordered-партиций
Скрипт rejectScript:
Возвращает задачу в
ZSETс новым score (время возврата).Для ordered партиций (
!): приоритет увеличивается на 1. Очередь читается с большего приоритета первой, поэтому «отклонённые» сообщения оказываются в конце своего уровня приоритета относительно новых — сохраняется исходный порядок относительно друг друга при повторной обработке.Для tie-break при нескольких reject в одну миллисекунду используется инкремент
reject_seqв score.
Высокоуровневый API Consume (см. queue.go): если в одном prefetch-батче для ordered-партиции первая задача получила reject, остальные задачи этой же партиции в батче также уходят в reject — чтобы не нарушить порядок «сначала обрабатываем первую».
Рекомендация по приоритетам: если вы используете бизнес-приоритеты (не только «слой» reject), задавайте шаг приоритета 100+ между уровнями, чтобы инкременты при reject не пересекались с осмысленными приоритетами.
Rate limit: RejectWithDelay для ordered-партиций выставляет ключ block с TTL — партиция временно не выдаётся, чтобы не крутить одно и то же сообщение в tight loop.
Воркеры падают — это нормально
Процесс убит OOM, деплой, сеть, баг — обрыв неизбежен.
Механика:
Консьюмер периодически вызывает ping (
pingScript): обновляет свой ключ с TTL ~120 с.Один из живых консьюмеров (через короткий distributed lock
unlock:lock) сканирует список консьюмеров: у кого нет heartbeat — считается мёртвым.Для мёртвого: задачи из его
HASHвозвращаются в очередь (для ordered — с тем же приоритетным приращением, что и при reject), lock партиции снимается, консьюмер вычищается из реестра.
Таким образом, «зависшие» in-progress задачи не теряются навсегда из-за падения процесса (при условии, что Redis доступен и данные на нём ещё целы — см. раздел про ограничения).
Когда такая очередь уместна
Персональные очереди событий (пользователь, счёт, сессия) при ограниченном числе воркеров и требовании порядка.
Отложенные джобы и повторы без отдельного scheduler-сервиса, если Redis уже есть.
Rate limiting по логической партиции через
RejectWithDelay.Идемпотентная публикация по
taskIdпри повторных попытках продюсера.
Ложка дёгтя: ограничения Redis и модели
Память не резиновая. Всё лежит в RAM (или с дисковым бэкендом — но это уже другие компромиссы). Большие payload напрямую бьют по памяти и по времени сериализации/копирования.
Большие сообщения снижают пропускную способность — больше данных на round-trip, больше давления на сеть и CPU Redis.
Durability. Redis может быть настроен с AOF/RDB, но не даёт тех же гарантий, что отдельный брокер с персистентностью «первого класса» в классическом enterprise-смысле. Возможны окна потери при сбоях (зависит от конфигурации fsync, репликации, split-brain). Сообщения в полёте между приложением и Redis тоже не застрахованы.
Не делайте Redis единственной точкой истины для критичных финансовых событий без внешнего подтверждения/дублирования в надёжное хранилище.
Для одинаковых scheduled в рамках партиции порядок зависит также от taskId. В этом случае для taskId лучше использовать sequence или uuidV1 (который уже включает в себя timestamp + sequence. Важно преобразовать uuid к time ordered версии). Пока нам хватило такого компромиса, но в целом есть варианты доработать скрипты.
Практический вывод: трактуйте очередь как ускоритель и координатор, а не как авторитетный журнал на века.
Итог
Задача «порядок в партиции + много ключей + общий пул воркеров + отложенные сообщения и ретраи» не укладывается ни в «только Kafka», ни в «только RabbitMQ» без компромиссов и дополнительного кода.
Redis + Lua даёт атомарные сценарии и предсказуемую модель данных при умеренной операционной сложности.
Библиотека smart-redis-queue реализует это явно: ordered-партиции (
!), приоритеты, prefetch, heartbeat и возврат задач при смерти воркера.Цена — границы Redis: память, размер сообщений, не абсолютная гарантия сохранности при катастрофах.
Если нужна максимальная персистентность и многоуровневая маршрутизация по всей компании — смотрите в сторону зрелых брокеров и event bus. Если нужен контролируемый task queue рядом с приложением и уже есть Redis — такой слой может быть разумным компромиссом.
Приложение: как воспроизвести замеры
# Redis на localhost:6379 go test -bench='BenchmarkPublish$|BenchmarkPublishBatch10|BenchmarkPublishBatch100|BenchmarkConsume$|BenchmarkConsumePrefetch|BenchmarkConsumeParallel' -benchmem -count=3 -run='^$'
Цифры зависят от железа, версии Redis, сети (если Redis удалённый) и загрузки машины — используйте их как ориентир, а не как универсальный SLA.
