Для тех кому лень читать и хочется сразу потрогать вот Github.

Откуда задача

Перед командой встала типичная для высоконагруженных сервисов постановка по обработке сообщений:

  • Порядок обработки в рамках одной логической партиции — события по одному пользователю/сущности должны идти строго последовательно.

  • Очень много ключей партиций — сотни тысяч и выше; заводить под каждую ключ отдельную «очередь» в классическом смысле неразумно.

  • Ограниченный пул воркеров, который шарится между партициями, при этом одна тяжёлая партиция не должна блокировать остальные — нужна справедливая конкуренция за CPU/сеть.

  • Ретраи и отложенные сообщения — временная недоступность внешнего API, отложенный запуск, rate limit.

Ни одна готовая «одна технология» не закрывает это из коробки без компромиссов. Ниже — как мы к этому пришли и что в итоге сделали в виде библиотеки smart-redis-queue (очередь на Redis, логика в Lua-скриптах).


Что уже есть на рынке: Kafka

Apache Kafka — распределённый лог: партиции топика, репликация, долговременное хранение, высокая пропускная способность.

Плюсы для нашей задачи:

  • Естественная модель «партиция = порядок внутри партиции».

  • Масштабирование чтения за счёт consumer groups и числа партиций.

Минусы / ограничения, из-за которых «только Kafka» было недостаточно:

  • Consumer group привязывает партиции к консьюмерам: это не тот же самый паттерн, что динамический пул воркеров, который в один момент обрабатывает задачи из разных партиций по мере готовности. В Kafka воркер «держит» назначенные ему партиции; гибкое «взять следующую готовую задачу из любой партиции из общего пула» — другая семантика (ближе к task queue, чем к streaming log).

  • TTL сообщений — не то же самое, что «отложить до времени T» в смысле бизнес-планировщика; retention и compaction решают другие задачи.

  • Операционная и архитектурная тяжесть: кластер, мониторинг, эволюция схем — не всегда оправданы, если у вас уже есть Redis и нужна именно очередь задач, а не бесконечный лог событий.

Итог: Kafka остаётся сильным выбором для event streaming и больших пайплайнов, но для описанного сочетания «много ключей + общий пул + отложенные задачи + явные ретраи в одном сервисе» хотелось более лёгкого и управляемого решения поверх уже имеющейся инфраструктуры.


RabbitMQ

RabbitMQ — брокер с очередями, exchange, routing, DLQ, TTL.

Плюсы:

  • Зрелая модель, понятные паттерны pub/sub и work queues.

  • TTL сообщений, dead-letter — из коробки.

Минусы для строгого порядка и «миллионов ключей»:

  • Гарантия порядка в общем случае не совпадает с моделью «одна логическая партиция = строгая последовательность» при нескольких консьюмерах на одной очереди: сообщения распределяются между воркерами, порядок на уровне «кто первый забрал» не тот, что порядок публикации.

  • Завести отдельную очередь на каждый ключ партиции при сотнях тысяч ключей — плохо масштабируется по ресурсам брокера и по эксплуатации.

  • Паттерн «общий пул воркеров + справедливость между логическими партициями» требует либо кастомной маршрутизации, либо своего слоя поверх брокера.

Итог: RabbitMQ отлично подходит для многих сценариев, но строгий порядок в рамках логической партиции при пуле воркеров без изобретательной схемы на стороне приложения не даётся просто так.


Решение: очередь на Redis и атомарность через Lua

Redis здесь — быстрое in-memory хранилище со структурами данных (ZSET, HASH, STRING, SET).

Lua в Redis — встроенный язык сценариев. Скрипт выполняется на сервере Redis целиком, между командами нет гонок с другими клиентами: скрипт — одна атомарная единица работы (пока не завершится выполнение скрипта, другой клиент не «вклинится» в середину). Это критично для операций вида «снять с ZSET, положить в processing, обновить блокировку партиции» — без Lua пришлось бы городить WATCH/MULTI или принимать race conditions.

Коротко: Lua + Redis = один round-trip и согласованное состояние ключей без распределённых транзакций между несколькими сервисами.


Проектирование ключей и структур

Используется префикс queue:{queueName}:.

Назначение

Ключ / структура

Множество «живых» партиций

SET queue:{name}:partitions

Очередь задач партиции с приоритетом

ZSET queue:{name}:partition:{code}:{priority} — score = время доступности (мс), member = taskId

Уровни приоритета внутри партиции

ZSET queue:{name}:partition:{code}:priorities

Полезная нагрузка

STRING queue:{name}:payload:{taskId}

Код партиции задачи

STRING queue:{name}:partition:{taskId}

Числовой приоритет

STRING queue:{name}:priority:{taskId}

Задачи «в работе» у консьюмера

HASH queue:{name}:consumer:{id}:tasks

Heartbeat

STRING queue:{name}:consumer:{id} с TTL

Блокировка ordered-партиции

STRING queue:{name}:partition:{code}:lock

Временный блок партиции (rate limit)

STRING queue:{name}:partition:{code}:block с TTL

Ordered-партиции — с префиксом ! в коде партиции: для них действует эксклюзивная блокировка (один консьюмер на партицию в момент времени), чтобы порядок не ломался.

Обычные партиции (без !) и базовая партиция (base) — параллельная обработка без такой блокировки.


Движение сообщения: схемы

Публикация (добавление)

Чтение (consume / Get)

Ack

Reject


Скрипт добавления (Publish)

Реализация в репозитории: scripts.go, переменная addScript.

Ключевая идея:

  1. Для каждой задачи: SET payload с NX — идемпотентность по taskId; дубликат не затирает существующую задачу.

  2. Сохраняются партиция и приоритет.

  3. В ZSET партиции добавляется taskId со score = scheduled в миллисекундах — это и есть отложенное сообщение: пока now < score, Get его не отдаст.

  4. Партиция регистрируется в глобальном SET партиций.

Фрагмент логики (упрощённо):

local ok = redis.call('SET', payloadKey, payload, 'NX')
if ok then
    redis.call('ZADD', partitionQueueKey, scheduled, taskId)
    redis.call('SADD', partitionsKey, partitionCode)
    -- ...
end

Бенчмарки публикации

Запуск: go test -bench='BenchmarkPublish$|BenchmarkPublishBatch10|BenchmarkPublishBatch100' -benchmem -count=3
Окружение при замере: Linux, go1.22+, Redis localhost:6379, CPU Intel i7-13700H (20 логических CPU), пакет bench-queue, БД Redis 5.

Имеется в виду размер батча в одном вызове Publish: 1 / 10 / 100 задач (не «число партиций»).

Пропускная способность добавления в пересчёте на задачи (удобно сравнивать с лимитами бизнеса):

  • одиночный Publish: задач/с ≈ 10⁹ / ns/op (одна итерация = одна задача);

  • батч: задач/с ≈ (размер батча × 10⁹) / ns/op (одна итерация = весь батч).

Бенчмарк

Смысл

ns/op (разброс по 3 прогонам)

Задач/с (оценка по тем же прогонам)

BenchmarkPublish

1 задача за вызов

~43–60 µs

~17–23 тысяч

BenchmarkPublishBatch10

10 задач за вызов

~108–123 µs на батч

~80–92 тысяч

BenchmarkPublishBatch100

100 задач за вызов

~855–986 µs на батч

~100–117 тысяч

Вывод: батчевый Publish сильно поднимает «задач в секунду» за счёт одного round-trip Lua на много записей — при массовой загрузке выгода очевидна.


Скрипт чтения (Get / consume)

Скрипт getScript в scripts.go:

  • Обновляет heartbeat консьюмера.

  • Берёт SMEMBERS активных партиций и итерируется (в текущей реализации внешний цикл набирает до prefetchCount задач, внутри — перебор партиций).

  • Для ordered-партиций (!) проверяется lock и опциональный block (cooldown после rate limit).

  • Для выбранной партиции: приоритеты читаются по убыванию (ZREVRANGE на priorities), внутри уровня — голова ZSET очереди; задача снимается только если scheduled <= now.

  • Задача переносится в HASH «задачи консьюмера» с отметкой времени.

Бенчмарки чтения

Здесь ns/op — среднее время на одну обработанную задачу (Get + Ack), т.к. в тесте крутится consumed < b.N по счётчику задач. Пропускная способность чтения одним воркером: задач/с ≈ 10⁹ / ns/op.

Бенчмарк

ns/op (разброс)

Задач/с на одного воркера (оценка)

BenchmarkConsume (prefetch по умолчанию 5)

~50–61 µs

~16–20 тысяч

BenchmarkConsumePrefetch/prefetch-1

~80–103 µs

~9.7–12.5 тысяч

BenchmarkConsumePrefetch/prefetch-5

~57–67 µs

~15–17.5 тысяч

Prefetch уменьшает число обращений к Redis на обработанную задачу, когда это допустимо бизнес-логикой.

Несколько воркеров (BenchmarkConsumeParallel): обрабатывается общий объём b.N задач; ns/op — это реальное (wall-clock) время на всю пачку, поделённое на b.N, т.е. показатель уже суммарной скорости конвейера (сколько задач в секунду система в целом вычитала и обработала).

Подбенчмарк

ns/op (разброс)

Суммарно задач/с (вся группа воркеров)

consumers-2

~33–36 µs

~28–30 тысяч

consumers-8

~38–44 µs

~23–26 тысяч

На восьми воркерах выигрыш не восьмерной — Redis и блокировки на стороне одного инстанса становятся узким местом; рост «горизонталью» здесь не линейный.


Ack

Скрипт ackScript:

  • Проверяет, что taskId есть в HASH задач этого консьюмера.

  • Уменьшает счётчик «задач в работе» по партиции; если партиция ordered и счётчик обнулился — снимает lock.

  • Удаляет payload, служебные ключи, запись в HASH.

Если задача не в processing или принадлежит не этому консьюмеру — операция не выполняется (возврат 0 на стороне скрипта).


Reject и нюансы ordered-партиций

Скрипт rejectScript:

  • Возвращает задачу в ZSET с новым score (время возврата).

  • Для ordered партиций (!): приоритет увеличивается на 1. Очередь читается с большего приоритета первой, поэтому «отклонённые» сообщения оказываются в конце своего уровня приоритета относительно новых — сохраняется исходный порядок относительно друг друга при повторной обработке.

  • Для tie-break при нескольких reject в одну миллисекунду используется инкремент reject_seq в score.

Высокоуровневый API Consume (см. queue.go): если в одном prefetch-батче для ordered-партиции первая задача получила reject, остальные задачи этой же партиции в батче также уходят в reject — чтобы не нарушить порядок «сначала обрабатываем первую».

Рекомендация по приоритетам: если вы используете бизнес-приоритеты (не только «слой» reject), задавайте шаг приоритета 100+ между уровнями, чтобы инкременты при reject не пересекались с осмысленными приоритетами.

Rate limit: RejectWithDelay для ordered-партиций выставляет ключ block с TTL — партиция временно не выдаётся, чтобы не крутить одно и то же сообщение в tight loop.


Воркеры падают — это нормально

Процесс убит OOM, деплой, сеть, баг — обрыв неизбежен.

Механика:

  • Консьюмер периодически вызывает ping (pingScript): обновляет свой ключ с TTL ~120 с.

  • Один из живых консьюмеров (через короткий distributed lock unlock:lock) сканирует список консьюмеров: у кого нет heartbeat — считается мёртвым.

  • Для мёртвого: задачи из его HASH возвращаются в очередь (для ordered — с тем же приоритетным приращением, что и при reject), lock партиции снимается, консьюмер вычищается из реестра.

Таким образом, «зависшие» in-progress задачи не теряются навсегда из-за падения процесса (при условии, что Redis доступен и данные на нём ещё целы — см. раздел про ограничения).


Когда такая очередь уместна

  • Персональные очереди событий (пользователь, счёт, сессия) при ограниченном числе воркеров и требовании порядка.

  • Отложенные джобы и повторы без отдельного scheduler-сервиса, если Redis уже есть.

  • Rate limiting по логической партиции через RejectWithDelay.

  • Идемпотентная публикация по taskId при повторных попытках продюсера.


Ложка дёгтя: ограничения Redis и модели

  1. Память не резиновая. Всё лежит в RAM (или с дисковым бэкендом — но это уже другие компромиссы). Большие payload напрямую бьют по памяти и по времени сериализации/копирования.

  2. Большие сообщения снижают пропускную способность — больше данных на round-trip, больше давления на сеть и CPU Redis.

  3. Durability. Redis может быть настроен с AOF/RDB, но не даёт тех же гарантий, что отдельный брокер с персистентностью «первого класса» в классическом enterprise-смысле. Возможны окна потери при сбоях (зависит от конфигурации fsync, репликации, split-brain). Сообщения в полёте между приложением и Redis тоже не застрахованы.

  4. Не делайте Redis единственной точкой истины для критичных финансовых событий без внешнего подтверждения/дублирования в надёжное хранилище.

  5. Для одинаковых scheduled в рамках партиции порядок зависит также от taskId. В этом случае для taskId лучше использовать sequence или uuidV1 (который уже включает в себя timestamp + sequence. Важно преобразовать uuid к time ordered версии). Пока нам хватило такого компромиса, но в целом есть варианты доработать скрипты.

Практический вывод: трактуйте очередь как ускоритель и координатор, а не как авторитетный журнал на века.


Итог

  • Задача «порядок в партиции + много ключей + общий пул воркеров + отложенные сообщения и ретраи» не укладывается ни в «только Kafka», ни в «только RabbitMQ» без компромиссов и дополнительного кода.

  • Redis + Lua даёт атомарные сценарии и предсказуемую модель данных при умеренной операционной сложности.

  • Библиотека smart-redis-queue реализует это явно: ordered-партиции (!), приоритеты, prefetch, heartbeat и возврат задач при смерти воркера.

  • Цена — границы Redis: память, размер сообщений, не абсолютная гарантия сохранности при катастрофах.

Если нужна максимальная персистентность и многоуровневая маршрутизация по всей компании — смотрите в сторону зрелых брокеров и event bus. Если нужен контролируемый task queue рядом с приложением и уже есть Redis — такой слой может быть разумным компромиссом.


Приложение: как воспроизвести замеры

# Redis на localhost:6379
go test -bench='BenchmarkPublish$|BenchmarkPublishBatch10|BenchmarkPublishBatch100|BenchmarkConsume$|BenchmarkConsumePrefetch|BenchmarkConsumeParallel' -benchmem -count=3 -run='^$'

Цифры зависят от железа, версии Redis, сети (если Redis удалённый) и загрузки машины — используйте их как ориентир, а не как универсальный SLA.