Алексей Барабанов, IT-директор «Хлебница» и спикер курса «RabbitMQ для админов и разработчиков», подготовил конспект о типовых архитектурных паттернах RabbitMQ. Из него вы узнаете, как настроить пайплайны обработки и реализовать очереди повторных попыток (в том числе, через механизм dead letter exchange).
![](https://habrastorage.org/getpro/habr/upload_files/b50/320/bf9/b50320bf96737e9595edf72ba1ee53ad.jpg)
Другие конспекты:
Пайплайн
Пайплайн — базовый элемент архитектуры, когда нужна последовательная обработка различными сервисами. Например, сначала требуется проверить авторизацию, затем выполнить дедупликацию и только после произвести запись данных сообщения в базу данных.
За счёт использования очередей в пайплайне хорошо видны bottleneck’и — узкие места, перед которыми скапливаются сообщения на обработку. Также есть понятный механизм расшивания посредством увеличения количества консьюмеров (конечно, это не серебряная пуля, так как проблемы могут быть у другого сервиса типа БД).
Плюс такого подхода — возможность удобного траблшутинга. На любом этапе вы можете добавить дублирующую очередь и получать копии сообщений как для разбора, так и для отправки на непродовые окружения.
![](https://habrastorage.org/getpro/habr/upload_files/9d7/686/1cc/9d76861ccea4b678126a86e71e83e720.png)
Типовой пример пайплайна обработки: Publisher отправляет сообщение, и затем оно передаётся от worker к worker по мере обработки. Worker 1 проверяет авторизацию, 2 — дедупликатор, 3 — пишет в БД.
Картинка с очередями выглядит немного сложнее, но суть та же:
![](https://habrastorage.org/getpro/habr/upload_files/574/0bf/fce/5740bffce09c43c22572edfd5eefb37a.png)
Здесь и далее я не рисую exchanges перед очередями, но надо понимать, что они везде есть. Опускаю их только для упрощения схемы.
Важно: worker 1 и worker 2 на данной схеме являются одновременно и consumer, и publisher. А что произойдёт, если worker 2 перестанет справляться с дедупликацией сообщений?
Представим, что код проверки написан не оптимально, и скорость проверки ниже, чем частота прихода новых сообщений.
![](https://habrastorage.org/getpro/habr/upload_files/ce3/551/72b/ce355172bee66664af80d70f5595417d.png)
В этом случае очередь сообщений перед worker 2 начнёт расти, о чём нам тут же сообщит система мониторинга.
Если есть возможность горизонтального масштабирования — запускаем ещё один инстанс worker 2:
![](https://habrastorage.org/getpro/habr/upload_files/58a/d6e/be3/58ad6ebe3ed6ff21e664d3426b443e26.png)
И они вдвоём ускоряют обработку сообщений. Разумеется, ускорить обработку таким способом получится не всегда. Если worker’ы упираются в скорость используемой БД, надо расшивать её или оптимизировать их код. Однако узкое горлышко обработки вы будете видеть сразу.
Очередь повторных попыток
Очередь повторных попыток — один из базовых элементов архитектуры приложения. Используется, когда есть вероятность неуспешной обработки сообщения (как «временные трудности» в целом, так и проблемы с обработкой конкретного сообщения). Работает, если нет необходимости в строгой последовательности обработки, так как подразумевает возможность временного пропуска сообщения.
Существуют два основных алгоритма реализации:
ack+publish — когда consumer сам подтверждает сообщение из основной очереди и паблишит в очередь повторных попыток;
reject+dlx — через механизм dead letter exchange.
Dead Letter eXchange
Dead Letter eXchange (DLX) — это явное или назначенное через Policy свойство очереди. Указывает RabbitMQ, куда необходимо отправлять сообщение при наступлении одного из событий:
x-mesage-ttl — превышение времени жизни;
x-max-length — превышение длины очереди;
reject — явный реджект сообщений со стороны консьюмера.
Для одной очереди можно указать только один DLX, что несколько снижает гибкость. Также вы можете указать dead-letter-routing-key, а можете не указывать — тогда сохраняется текущий для каждого сообщения.
Пример ack+publish
![](https://habrastorage.org/getpro/habr/upload_files/9ee/360/5f4/9ee3605f4a3e6379bcec0baf88cbeaed.png)
Предположим, у нас есть обработка сообщений, использующая некий внешний сервис, находящийся вне нашей зоны ответственности. Пускай сообщение «2» в данный момент не может быть им успешно обработано.
Сообщения помещаются в очередь:
![](https://habrastorage.org/getpro/habr/upload_files/ef1/fa1/646/ef1fa164693abf928583e1483dbe4e9e.png)
Worker берёт сообщение 1 в обработку:
![](https://habrastorage.org/getpro/habr/upload_files/931/32d/536/93132d5364aca43277de32ad6186ffa2.png)
После успешной обработки первого сообщения доходит дело до второго — проблемного. Происходит ошибка его обработки:
![](https://habrastorage.org/getpro/habr/upload_files/d09/18c/937/d0918c937a6d36323e1086d37eaa7227.png)
Если мы сделаем nack этому сообщению, оно вернётся в очередь на тоже самое место:
![](https://habrastorage.org/getpro/habr/upload_files/781/4d2/429/7814d242966a11d80f47f757da19c5d0.png)
И незамедлительно будет закинуто в worker:
![](https://habrastorage.org/getpro/habr/upload_files/7cb/45c/8dc/7cb45c8dc4df116f5dc81c97087fe9b8.png)
Но здесь его снова ждёт неуспешная обработка. И пока одно сообщение будет бегать туда-сюда, оно, как тромб, забьёт голову очереди. В результате все сообщения, которые могли бы быть обработаны, будут висеть в очереди и ждать своего часа:
![](https://habrastorage.org/getpro/habr/upload_files/7d7/5dd/f64/7d75ddf64993a0d425839761521e26c4.png)
Как раз для таких случаев нужен паттерн очереди повторных попыток. Суть в том что мы создаем дополнительную очередь «retry queue», куда складируем неуспешно обработанные сообщения:
![](https://habrastorage.org/getpro/habr/upload_files/a8e/88e/592/a8e88e5929c7c998cc57a6230ec1bee0.png)
В случае ошибки обработки worker должен сделать ACK из основной очереди и publish в новую очередь:
![](https://habrastorage.org/getpro/habr/upload_files/c30/1ce/412/c301ce412303665afa4672efa76221ff.png)
В результате для нашего примера 1 и 3 сообщения будут успешно обработаны, а 2 останется в выделенной очереди ожидать своего часа:
![](https://habrastorage.org/getpro/habr/upload_files/bf8/673/95e/bf867395e876ba2993b7fb21248c2643.png)
Это как раз подход ack+publish. Он наиболее часто используется в IT-системах, потому что разработчики не знают о некоторых возможностях RabitMQ. Я же рекомендую использовать другой метод — reject+DLX.
Пример reject+dlx
Всё аналогично, но для очереди мы декларируем DLX = Fail, который ведёт в ту самую очередь повторных попыток:
![](https://habrastorage.org/getpro/habr/upload_files/8c6/127/7be/8c61277be049fd8b19ff5e9ee4574d84.png)
При неуспешной обработке мы делаем reject(false) вместо ack и publish:
![](https://habrastorage.org/getpro/habr/upload_files/314/840/968/314840968da2515716a5b4ba19dff397.png)
Принцип тот же, но работает проще: меньше кода — меньше возможных проблем. Результат ожидаемо аналогичный:
![](https://habrastorage.org/getpro/habr/upload_files/867/b07/ee5/867b07ee54790b8e54d7ea93a9563a9a.png)
Но ведь нам нужно совершать повторные попытки обработки, а не просто складывать сообщения в отстойник. Для этого мы используем тот же самый механизм DLX + механизм TTL:
![](https://habrastorage.org/getpro/habr/upload_files/4bf/5cb/a04/4bf5cba0448227d8d261acf8c36054bf.png)
В результате сообщение 2 будет каждую минуту возвращаться в основную очередь. Будет происходить попытка его обработки и так до тех пор, пока сообщение не будет успешно обработано. Просто и без лишнего кода.
Для очередей без consumer, где возможно хранение сообщений, рекомендую также добавлять признак lazy-queue, чтобы RabbitMQ старался не держать эти сообщения в оперативке.
Про нейминг очередей: важно сразу договориться, чтобы не разводить бардака на проекте. Названия очередей должны быть поняты всем, а специализированные очереди (типа очередей с ттл и dlx или очередей-отстойников для сбойных сообщений/дублей) лучше дополнять суффиксами, например .dlx или .fail. Желательно, чтобы начало названия очереди совпадало с названием workload’а/контейнера. Это позволит легко и просто находить worker, отвечающий за обработку очереди.
![](https://habrastorage.org/getpro/habr/upload_files/263/79b/f61/26379bf61341624a50cf135b96229850.jpg)