Автор, лучи добра тебе и пожелание попасть в рай без очереди :)
А вот что нам понадобилось в реальной практике и было сходу непонятно как реализовать:
— как сделать подписку по маске (каюсь, не сразу нашли звездочку и решётку);
— как сделать, когда надо чтобы оба консумера получили мессадж, и когда надо чтобы строго один консумер из N возможных;
— как сделать, чтобы консумер выполнял действие, только получив сообщения из двух и более очередей, делая ack только по получению всех и не потребляя процессорные ресурсы между этими сообщениями (multi-queue consumer)
Спасибо)
1. Подписка по маске будет в следующей статье, как я и написал в конце этой статьи.
2. Почему бы не сделать две очереди? Одну для всех и одну строго для одного?
3. Не очень понял последнего пункта.
как сделать, когда надо чтобы оба консумера получили мессадж
— Паб/саб реадихуется через fanout exchange
как сделать, когда надо чтобы строго один консумер из N возможных
— Для direct exchange это дефолтное поведение реализовано через round robin
как сделать, чтобы консумер выполнял действие, только получив сообщения из двух и более очередей
— Из коробки можно подписываться на сколько угодно очередей, выставив ручное подверждение и отправлять ack опираясь уже на вашу бизнес логику
Вопросы получения сообщений касаются исключительно логики queue, т.е.
— хотим что бы был round robin — консьюмеры должны слушать одну очередь на всех
— хотим что бы каждый получил сообщене — консьюмеры должны слушать разные очереди
Вопросы роутинга находятся на стыке queue and exchange, при этом тип exchange — скорее способ задавать различные способы адресации.
И последнее — из коробки можно подписать одну очередь несколькими биндингами, которые могут иметь разные типы exchange and routing keys, а можно — слушать несколько очередей (тут есть разница между «подписываться» и «слушать»), соответственно выбор правильной стратегии (много очередей мало биндингов или наоборот) зависит от задачи (ну или уже имеющегося хозяйства)
Это плохо, потому что:
— если например комбинация сообщений B и C уже позволяет выполнить следующий шаг бизнес-логики, то ждать сообщения из A нам смысла нет. Нет никакого способа «из коробки» поймать такую ситуацию в коде PHP.
— consume на одну из очередей означает, что процессорные ресурсы потребляются по сути на прогон пустого цикла ожидания. Нет никакого способа «из коробки» дёрнуть действие только по получению всех трёх сообщений, а пока они все не пришли — заниматься другими задачами.
Во-вторых, ack сообщений не атомарен, как например в редисе (там можно завернуть несколько действий в атомарную транзакцию). Если ваш скрипт получил сообщения из трёх очередей, сделал первому ack, второму ack, а третьему не успел — умер (в ДЦ попал метеорит) — вы получили неконсистентность данных.
В-третьих, если у вас несколько воркеров, то вам нужно сделать чтобы все три сообщения в очередях A, B и C, составляющие «смысловую тройку» по бизнес-логике, попали строго в один воркер. Никаких round-robin.
Естественно, речь идёт о высоконагруженных проектах, сопоставимых с… скажем с Вконтакте или Amazon. В проектах меньшего масштаба на всё перечисленное можно забить болт :)
Надеюсь, я достаточно подробно объяснил почему нормальный multi-consumer не так просто реализовать?
Радость от rabbitmq, как раз в том, что можно писать разные части на разных языках. Зачем вы пишете на PHP?
Если у вас столь сложная бизнес логика, которая выходит за рамки rabbitmq, но вы очень любите PHP, велкам к zeromq. Будете разруливать любую ситуацию как вам угодно. В противном случае у вас будет огород из воркеров, часть из которых будут разруливать только потоки данных и будут узким местом.
То что вы описали это абстрактная проблема, возможно ее можно решить изменив потоки данных, например сваливать все во временные очереди и ждать из них.
Я бы сказал, что это радость от правильно организованной распределённой архитектуры, а не только rabbit :) Я не агитирую конкретно за PHP, но это популярный язык, который понятен многим читателям.
Конкретный пример: допустим, есть «облачный сервис» — таск-менеджер (или багтрекер). Пользователю по текущему тарифу разрешено создавать в нём не более 10 проектов, за каждый из которых он платит 100 рублей в месяц. От пользователя пришла входящая задача — создать новый проект. Исполняющий сервис ожидает сообщения одновременно из четырёх очередей:
A) подтверждение, что пользователь валиден, авторизован, и вообще имеет право выполнять действие project.create;
B) подтверждение, что у пользователя в данный момент не более 10 проектов;
C) подтверждение, что у пользователя на балансе вообще есть сумма не менее 100 рублей (ну или подтверждение списания 100 рублей, если так понятнее);
D) подтверждение от валидатора, что название проекта (поддомен) не содержит нецензурных или оскорбительных слов.
Только по получению сообщений об успехе из всех четырёх очередей — исполнительный сервис может приступать к фактическому созданию проекта.
Однако заметно, что достаточно получить как минимум два сообщения о НЕуспехе, например из «A» и «B», и результат из «C» и «D» нам становится неинтересен. Поскольку, напомню, процессорное время стоит денег — ожидание двух последних становится лишней работой, которая расходуется впустую — а значит впустую тратятся деньги. Уточню, что сообщения приходят в непредсказуемое время, и быстрее всех могут придти как A и B, так и например B и D, в зависимости от текущей нагрузки на тот или иной сегмент системы.
Если же мы развернём эти проверки в последовательность действий (A-B-C-D), мы удлинним всю цепочку — и снова потратим процессорные ресурсы впустую в том случае, если например A, B, C будут успешны, а D — зафэйлится. Не говоря уже о том, что приложение может потерять в «отзывчивости», визуально станет медленнее работать. Можно было бы сказать «заранее думайте о порядке действий», но это неудобно и ухудшает архитектурную простоту системы.
А ты хитрец.
Данные передются только средствами рэбита или можно хранить промежуточные значения в редисе например?
Почему имеено 2 НЕуспеха? Вроде бы любой отказ должен фейлить всю операцию.
Именно 2 НЕуспеха здесь для иллюстрации ситуации, когда на одном неуспехе нельзя сделать окончательный вывод. Например, если у пользователя 99 рублей на счету, то можно его кредитовать на рубль (или уйти в минус на рубль), но всё равно предоставить услугу. Два фейла — это просто пример, но его тоже нужно учесть в проектировании.
Если пойти на то, чтобы хранить состояние [между сообщениями], то вся задача теряет актуальность. Однако и здесь вопрос гораздо шире. Дело в том, что «серьёзные» распределённые системы вообще требуют иного подхода, нежели общепринятый подход к программированию как к последовательности команд в программе.
В идеале, SOA-архитектура должна быть построена без хранимых состояний и разделяемой памяти. Почему этих штук следует избегать — вопрос слишком большой, чтобы описать его в рамках топика. Говоря кратко, надо строить систему из «тупых», легко заменяемых и дублируемых кирпичиков — конечных автоматов, а функционировать они должны по принципам архитектуры потока данных.
Материалы для любопытных читателей:
— общий обзор habrahabr.ru/post/122479/
— для самостоятельного гугления: event-driven development, паттерн observer-notifier, SOA-архитектура, архитектура потока данных, основы философии erlang, конечные автоматы, принципы фон-неймана.
Если мы разрешаем себе хранение состояний, тогда (казалось бы) вся задача теряет актуальность: храним промежуточные результаты в редисе и не паримся. Однако я могу совершенно точно сказать, что нельзя иметь хранимые состояния «на входе» в систему, где любой неавторизованный пользователь может нам этот редис засрать, навалив кучу (миллионы) входящих запросов. Это компромисс, на который надо идти очень осторожно.
Значит есть
Клиент — который хочет результат сервиса
Валидаторы — независимые раз-|| шматки кода
Сервис — который делает, что-то после валидаторов
В рамках ребита я б сделал так:
Сервисов много они слушают все 1 очередь service_queue(раунд робин)
Валидоторов много они слушают каждый свой тип очереди validator_typeN_queue(раундробин)
Клиент пораждает 2 временных очереди replay_queue123 и validates_queue123.
Кажому валидатору в котором он заинтересован шлет validates_queue123 и запрос в соотвествующую validator_typeN_queue.
Сервису через service_queue отсылает список валидаторов, которые были запущены, и обе временных очереди replay_queue123 и validates_queue123.
Начинает ожидать результат в replay_queue123
Валидатор сделав свое дело отсылает результат во временную очередь validates_queue123.
Сервис получив список валидаторов и очередь validates_queue123 подписывается на нее и ждет резалты. В нем реализуется вся логика. Получили 2 фейла стриаем очередь validates_queue123 шлем в replay_queue123 провал.
Для большей гибкости и большей производительности можно из сервиса вычленить что-то типа summary_validator в котором останется логика по результатам валидаторов. И котороый при фейле будет слать киленту сразу феил в очередь replay_queue123, а если ок то — сервису, передавая очередь replay_queue123. Сервис, тогда будет всегда получать положительныей запросы и ничего ждать не будет.
RabbitMQ tutorial 4 — Роутинг