Всем доброго времени суток, друзья.
Сегодня захотелось поговорить о том, как можно работать с RabbitMQ в Symfony и совсем чуть-чуть о некоторых подводных комнях. В конце я напишу парочку интересных моментов о кролике (рус. перевод «rabbit») для тех, кто совсем в танке.
Я не буду рассказывать про сам RabbitMQ, поэтому если вы пока и этого не знаете, почитайте следующие переводы:
Статья 1
Статья 2
Статья 3
Статья 4
Статья 5
Не бойтесь примеров на перле или пайтоне — это не страшно, все достаточно понятно из исходного кода.
+ Все достаточно подробно описано, когда я читал это в свое время, достаточно было интерпретировать код мысленно, чтобы понять как что и зачем.
Если вы уже знаете, что такое консумер и почему в нем нужно делать $em->clear() + gc_collect_cycles, а после закрывать соединение с базой данных, то, скорее всего, вы ничего нового для себя не узнаете. Статья скорее для тех, кто не хочет разбираться с AMQP протоколом, но которым нужно прямо сейчас применять очереди и выбор почему-тобездумно пал на RabbitMQ, а не тот же легковесный beanstalkd.
Если же у вас микросервисная архитектура и вы ждете, что я расскажу вам как сварить коммуникацию между компонентами через AMQP, как красиво делать RPC, то я сам чего-то подобного очень давно жду на Хабре…
Перед нами задача: отправлять сообщения на Email в очереди, используя RabbitMQ, а так же обеспечить отказоустойчивость: если почтовый сервер ответил таймаутом или что-то ещё сломалось — нужно попробовать выполнить задачу через 30 секунд ещё раз.
Итак, устанавливаем наш бандл.
Я слишком ленив, чтобы описывать вам, как нужно копировать composer require команду и строку в AppKernel.
Я очень надеюсь, что вы сами это сделали и готовы приступать к конфигурированию нашего бандла.
Конфигурация бандла для нас:
Здесь огромное внимание следует обратить на producers и consumers. Если очень коротко и просто: producer — это то, что отправляет сообщения через RabbitMQ в consumer, а consumer в свою очередь — та вещь, которая получает и обрабатывает эти сообщения. Здесь же exchange_options — опции для обменника (вы же прочитали статьи про rabbitmq, которые были в начале статьи?), queue_options — опции для очереди (аналогично). Так же стоит обратить внимание на callback в consumer — здесь указывается ID сервиса, который расширяет ConsumerInterface (execute метод с аргументом сообщения).
Т.к. пока что у вас его нету, при запуске приложения или компиляции контейнера мы получим какое-то DI исключение, что сервис не найден, но мы его запрашиваем. Поэтому давайте создавать наш сервис:
И сам класс:
Ну вы же не обиделись, что я не включил в статью как работать со SwiftMailer? :) Нам важно, чтобы сюда асинхронно доставлялась строка через очередь сообщений, то, как мы будем обрабатывать эту строку — дело наше. Почта — всего лишь пример кейса.
Как же нам передать строку в наш консьюмер? Для этого давайте создадим тестовую команду:
Снова извиняюсь, что не сделал для вас контроллер с красивой формочкой — я слишком ленив для этого. Да и слишком уж излишне. А я экономный лентяй и люблю рисовать, мечтаю немного в сторону теорий и архитектуры приложений. Отвлеклись.
Теперь запускаем наш consumer и приказываем ему ждать сообщения из RabbitMQ:
И отправим ему сообщение из нашей тестовой команды:
И вот сейчас, в процессе rabbitmq:consumer, мы можем увидеть наше сообщение! И что псевдо отправка завершилась успехом.
А теперь давайте посмотрим, как можно реализовать отложенную обработку сообщений в случае ошибок. Я не буду использовать плагин RabbitMQ для отложенных сообщений. Мы будем достигать этого путем создания новой очереди, в которой укажем время жизни сообщений 30��ек и установим настройку: после смерти — перекладываться в основную очередь.
Достаточно лишь добавить новый producer:
Теперь давайте изменим логику консумера:
А вообще для вывода полезно использовать LoggerInterface — и красиво и масштабируется.
Но нам же лень и мы не хотим создавать дополнительные «думки», верно? Просто знайте.
Теперь мы должны прокинуть producer для отложенной очереди:
И изменим команду:
Теперь вместе с нормальным сообщением она будет отправлять и плохое сообщение.
Если мы запустим, то увидим следующий вывод:
Спустя 30 секунд еще раз появится сообщение об обработке:
И так бесконечно. Логику максимальных попыток и т.п. продумывайте сами. Далее я дам пару советов для вашего прода и некоторых фич.
Теперь советы для вашего прода:
1) Не отходя от темы с максимальными попытками обработки: знайте на все 102% все возможные исключения контекста с которым вы работаете! Умейте представлять, когда повторная обработка требуется, а когда нет, иначе — привет мусорке из логов и отсутствия понимания что происходит. В случае, если битая задача будет крутится в RabbitMQ, с реальными данными, нормальными задачами, вы вряд ли сможете выкинуть сломанные задачи без костылей, не обновляя код консьюмера и н�� перезапуская его. Поэтому продумывайте это сразу. В данном случае правильным было бы ловить только лишь SMTPTimeOutException какой-нибудь.
Так же с такой моделью важно понимать, что: на 1 очередь — одна «глобальная ответственность смены состояния чего либо». Не стоит давать слишком много рискованных задач своему воркеру. Если рассмотреть вариант с 1С, то проблема может быть в следующем: допустим при успешном или неуспешном изменении\добавлении товара в 1С мы записываем в базу данных что-нибудь, например, дату последней удачной синхронизации или неудачной. Т.е. тут обновляются сразу 2 базы данных: бд 1С и бд вашего приложения. Допустим в 1С все успешно создалось, далее идет обновление в базе данных поля «дата последней удачной синхронизации» — хоп, вылезла ошибочка, опять же, сервер бд не отвечает — задача откладывается на «потом» и повторяется, пока база данных не начнет отвечать. И при этом каждый раз «подзадача» связанная с созданием сущности в 1С будет успешно выполняться, каждый раз при неудачной попытке записи в базу данных сайта, что неправильно.
2) Прочитайте про durable, раз уж мы с вами используем RabbitMQ. P.S: это заводится как true\false флаг «durable» в конфиге бандла, конкретно — в exchange_options и queue_options
3) Всю свою жизнь закрывайте соединение к базе данных после выполнения работы программы. А так же запускайте очистку EM и после сборщик мусора для чистки ссылок. Т.е. в конце концов наш консьюмер должен выглядеть как-то так:
Консьюмер работает как демон, поэтому постоянно копить в нем ссылки и держать соединение с бд — это плохо. В случае с MySQL вы получите MySQL server has gone away.
4) Много думайте, почему ваша модель отложенных сообщений может неожиданно убить ваш бизнес. Например у нас есть механизм, который при изменении товара в админке заливает эти изменения через очередь в 1С. Теперь представим ситуацию: администратор меняет товар -> создается задача #1 на попытку изменить те же данные в базе 1С. Сервер 1С не отвечает, поэтому задачка просто перекладывается постоянно, пока все не заработает. За это время администратор решил еще кое-что подправить в том же товаре, что он и делает. Регистрируется задача #2.
А теперь представьте ситуацию, когда поочередно выполняются и откладываются задачи #1 и #2.
Что если 1С заработает к моменту выполнения задачи #2? Задача выполнится и зальёт последние изменения. Далее пойдет в ход задача #1 и затрет собой стабильные изменения :)
Выход: отправляем timestamp в качестве version, и, если задача «из прошлого» — выкидываем её.
5) Идешь в асинхронность — прочитай про многие архитектурные проблемы, а также race condition, несогласованность консумеров на разных машинах и прочее.
6) Пишите версии вашим очередям… Ух как помогает на реальном проде. В принципе мы так и сделали в этом примере.
7) Возможно тебе не нужен RabbitMQ и целый AMQP протокол. Посмотри в сторону beanstalkd.
8) Запускайте консумеры и всякое прочее демоническое на php через supervisor и подключите полное логирование падения процессов в нём. У него так же есть web интерфейс для управления всем этим делом, что так же очень удобно. Проблемы будут всегда.
Сегодня захотелось поговорить о том, как можно работать с RabbitMQ в Symfony и совсем чуть-чуть о некоторых подводных комнях. В конце я напишу парочку интересных моментов о кролике (рус. перевод «rabbit») для тех, кто совсем в танке.
Я не буду рассказывать про сам RabbitMQ, поэтому если вы пока и этого не знаете, почитайте следующие переводы:
Статья 1
Статья 2
Статья 3
Статья 4
Статья 5
Не бойтесь примеров на перле или пайтоне — это не страшно, все достаточно понятно из исходного кода.
+ Все достаточно подробно описано, когда я читал это в свое время, достаточно было интерпретировать код мысленно, чтобы понять как что и зачем.
Если вы уже знаете, что такое консумер и почему в нем нужно делать $em->clear() + gc_collect_cycles, а после закрывать соединение с базой данных, то, скорее всего, вы ничего нового для себя не узнаете. Статья скорее для тех, кто не хочет разбираться с AMQP протоколом, но которым нужно прямо сейчас применять очереди и выбор почему-то
Если же у вас микросервисная архитектура и вы ждете, что я расскажу вам как сварить коммуникацию между компонентами через AMQP, как красиво делать RPC, то я сам чего-то подобного очень давно жду на Хабре…
Перед нами задача: отправлять сообщения на Email в очереди, используя RabbitMQ, а так же обеспечить отказоустойчивость: если почтовый сервер ответил таймаутом или что-то ещё сломалось — нужно попробовать выполнить задачу через 30 секунд ещё раз.
Итак, устанавливаем наш бандл.
Я слишком ленив, чтобы описывать вам, как нужно копировать composer require команду и строку в AppKernel.
Я очень надеюсь, что вы сами это сделали и готовы приступать к конфигурированию нашего бандла.
Если нет, то вот вам полный гайд для самых маленьких:
Установка RabbitMQ:
Теперь вы можете открыть ваш localhost:15672 под учеткой: guest guest и увидеть много прикольных вещей, в которых скоро вы будете разбираться и чувствовать себя мужиком.
Теперь устанавливаем сам бандл:
И регистрируем его в нашем приложении:
Вот и всё.
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - sudo apt-get update sudo apt-get install rabbitmq-server sudo rabbitmq-plugins enable rabbitmq_management
Теперь вы можете открыть ваш localhost:15672 под учеткой: guest guest и увидеть много прикольных вещей, в которых скоро вы будете разбираться и чувствовать себя мужиком.
Теперь устанавливаем сам бандл:
composer require php-amqplib/rabbitmq-bundle
И регистрируем его в нашем приложении:
// app/AppKernel.php public function registerBundles() { $bundles = array( new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(), ); }
Вот и всё.
Конфигурация бандла для нас:
old_sound_rabbit_mq: connections: default: host: 'localhost' port: 5672 user: 'guest' password: 'guest' vhost: '/' lazy: false connection_timeout: 3 read_write_timeout: 3 keepalive: false heartbeat: 0 use_socket: true producers: send_email: connection: default exchange_options: { name: 'notification.v1.send_email', type: direct } consumers: send_email: connection: default exchange_options: { name: 'notification.v1.send_email', type: direct } queue_options: { name: 'notification.v1.send_email' } callback: app.consumer.mail_sender
Здесь огромное внимание следует обратить на producers и consumers. Если очень коротко и просто: producer — это то, что отправляет сообщения через RabbitMQ в consumer, а consumer в свою очередь — та вещь, которая получает и обрабатывает эти сообщения. Здесь же exchange_options — опции для обменника (вы же прочитали статьи про rabbitmq, которые были в начале статьи?), queue_options — опции для очереди (аналогично). Так же стоит обратить внимание на callback в consumer — здесь указывается ID сервиса, который расширяет ConsumerInterface (execute метод с аргументом сообщения).
Т.к. пока что у вас его нету, при запуске приложения или компиляции контейнера мы получим какое-то DI исключение, что сервис не найден, но мы его запрашиваем. Поэтому давайте создавать наш сервис:
#app/config/services.yml services: app.consumer.mail_sender: class: AppBundle\Consumer\MailSenderConsumer
И сам класс:
namespace AppBundle\Consumer; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use PhpAmqpLib\Message\AMQPMessage; /** * Class NotificationConsumer */ class MailSenderConsumer implements ConsumerInterface { /** * @var AMQPMessage $msg * @return void */ public function execute(AMQPMessage $msg) { echo 'Ну тут типа сообщение пытаюсь отправить: '.$msg->getBody().PHP_EOL; echo 'Отправлено успешно!...'; } }
Ну вы же не обиделись, что я не включил в статью как работать со SwiftMailer? :) Нам важно, чтобы сюда асинхронно доставлялась строка через очередь сообщений, то, как мы будем обрабатывать эту строку — дело наше. Почта — всего лишь пример кейса.
Как же нам передать строку в наш консьюмер? Для этого давайте создадим тестовую команду:
namespace AppBundle\Command; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class TestConsumerCommand extends ContainerAwareCommand { /** * {@inheritdoc} */ protected function configure() { $this ->setName('app:test-consumer') ->setDescription('Hello PhpStorm'); } /** * {@inheritdoc} */ protected function execute(InputInterface $input, OutputInterface $output) { $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('Сообщенька для отправки на мыло...'); }
Снова извиняюсь, что не сделал для вас контроллер с красивой формочкой — я слишком ленив для этого. Да и слишком уж излишне. А я экономный лентяй и люблю рисовать, мечтаю немного в сторону теорий и архитектуры приложений. Отвлеклись.
Теперь запускаем наш consumer и приказываем ему ждать сообщения из RabbitMQ:
bin/console rabbitmq:consumer send_email -vvv
И отправим ему сообщение из нашей тестовой команды:
bin/console app:test-consumer
И вот сейчас, в процессе rabbitmq:consumer, мы можем увидеть наше сообщение! И что псевдо отправка завершилась успехом.
А теперь давайте посмотрим, как можно реализовать отложенную обработку сообщений в случае ошибок. Я не буду использовать плагин RabbitMQ для отложенных сообщений. Мы будем достигать этого путем создания новой очереди, в которой укажем время жизни сообщений 30��ек и установим настройку: после смерти — перекладываться в основную очередь.
Достаточно лишь добавить новый producer:
producers: send_email: connection: default exchange_options: { name: 'notification.v1.send_email', type: direct } delayed_send_email: connection: default exchange_options: name: 'notification.v1.send_email_delayed_30000' type: direct queue_options: name: 'notification.v1.send_email_delayed_30000' arguments: x-message-ttl: ['I', 30000] x-dead-letter-exchange: ['S', 'notification.v1.send_email']
Теперь давайте изменим логику консумера:
namespace AppBundle\Consumer; use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface; use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface; use PhpAmqpLib\Message\AMQPMessage; /** * Class NotificationConsumer */ class MailSenderConsumer implements ConsumerInterface { private $delayedProducer; /** * MailSenderConsumer constructor. * @param ProducerInterface $delayedProducer */ public function __construct(ProducerInterface $delayedProducer) { $this->delayedProducer = $delayedProducer; } /** * @var AMQPMessage $msg * @return void */ public function execute(AMQPMessage $msg) { $body = $msg->getBody(); echo 'Ну тут типа сообщение отправляю '.$body.' ...'.PHP_EOL; try { if ($body == 'bad') { throw new \Exception(); } echo 'Успешно отправлено...'.PHP_EOL; } catch (\Exception $exception) { echo 'ERROR'.PHP_EOL; $this->delayedProducer->publish($body); } } }
А вообще для вывода полезно использовать LoggerInterface — и красиво и масштабируется.
Но нам же лень и мы не хотим создавать дополнительные «думки», верно? Просто знайте.
Теперь мы должны прокинуть producer для отложенной очереди:
#app/config/services.yml services: app.consumer.mail_sender: class: AppBundle\Consumer\MailSenderConsumer arguments: ['@old_sound_rabbit_mq.delayed_send_email_producer']
И изменим команду:
namespace AppBundle\Command; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; class TestConsumerCommand extends ContainerAwareCommand { /** * {@inheritdoc} */ protected function configure() { $this ->setName('app:test-consumer') ->setDescription('Hello PhpStorm'); } /** * {@inheritdoc} */ protected function execute(InputInterface $input, OutputInterface $output) { $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('Ура, сообщенька...'); $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('bad'); } }
Теперь вместе с нормальным сообщением она будет отправлять и плохое сообщение.
Если мы запустим, то увидим следующий вывод:
Ну тут типа сообщение отправляю Ура, сообщенька... Успешно отправлено... Ну тут типа сообщение отправляю bad... ERROR
Спустя 30 секунд еще раз появится сообщение об обработке:
Ну тут типа сообщение отправляю bad... ERROR
И так бесконечно. Логику максимальных попыток и т.п. продумывайте сами. Далее я дам пару советов для вашего прода и некоторых фич.
Теперь советы для вашего прода:
1) Не отходя от темы с максимальными попытками обработки: знайте на все 102% все возможные исключения контекста с которым вы работаете! Умейте представлять, когда повторная обработка требуется, а когда нет, иначе — привет мусорке из логов и отсутствия понимания что происходит. В случае, если битая задача будет крутится в RabbitMQ, с реальными данными, нормальными задачами, вы вряд ли сможете выкинуть сломанные задачи без костылей, не обновляя код консьюмера и н�� перезапуская его. Поэтому продумывайте это сразу. В данном случае правильным было бы ловить только лишь SMTPTimeOutException какой-нибудь.
Так же с такой моделью важно понимать, что: на 1 очередь — одна «глобальная ответственность смены состояния чего либо». Не стоит давать слишком много рискованных задач своему воркеру. Если рассмотреть вариант с 1С, то проблема может быть в следующем: допустим при успешном или неуспешном изменении\добавлении товара в 1С мы записываем в базу данных что-нибудь, например, дату последней удачной синхронизации или неудачной. Т.е. тут обновляются сразу 2 базы данных: бд 1С и бд вашего приложения. Допустим в 1С все успешно создалось, далее идет обновление в базе данных поля «дата последней удачной синхронизации» — хоп, вылезла ошибочка, опять же, сервер бд не отвечает — задача откладывается на «потом» и повторяется, пока база данных не начнет отвечать. И при этом каждый раз «подзадача» связанная с созданием сущности в 1С будет успешно выполняться, каждый раз при неудачной попытке записи в базу данных сайта, что неправильно.
2) Прочитайте про durable, раз уж мы с вами используем RabbitMQ. P.S: это заводится как true\false флаг «durable» в конфиге бандла, конкретно — в exchange_options и queue_options
3) Всю свою жизнь закрывайте соединение к базе данных после выполнения работы программы. А так же запускайте очистку EM и после сборщик мусора для чистки ссылок. Т.е. в конце концов наш консьюмер должен выглядеть как-то так:
class MailSenderConsumer implements ConsumerInterface { private $delayedProducer; private $entityManager; /** * MailSenderConsumer constructor. * @param ProducerInterface $delayedProducer * @param EntityManagerInterface $entityManager */ public function __construct(ProducerInterface $delayedProducer, EntityManagerInterface $entityManager) { $this->delayedProducer = $delayedProducer; $this->entityManager = $entityManager; gc_enable(); } /** * @var AMQPMessage $msg * @return void */ public function execute(AMQPMessage $msg) { $body = $msg->getBody(); echo 'Ну тут типа сообщение отправляю '.$body.' ...'.PHP_EOL; try { if ($body == 'bad') { throw new \Exception(); } echo 'Успешно отправлено...'.PHP_EOL; } catch (\Exception $exception) { echo 'ERROR'.PHP_EOL; $this->delayedProducer->publish($body); } $this->entityManager->clear(); $this->entityManager->getConnection()->close(); gc_collect_cycles(); } }
Консьюмер работает как демон, поэтому постоянно копить в нем ссылки и держать соединение с бд — это плохо. В случае с MySQL вы получите MySQL server has gone away.
4) Много думайте, почему ваша модель отложенных сообщений может неожиданно убить ваш бизнес. Например у нас есть механизм, который при изменении товара в админке заливает эти изменения через очередь в 1С. Теперь представим ситуацию: администратор меняет товар -> создается задача #1 на попытку изменить те же данные в базе 1С. Сервер 1С не отвечает, поэтому задачка просто перекладывается постоянно, пока все не заработает. За это время администратор решил еще кое-что подправить в том же товаре, что он и делает. Регистрируется задача #2.
А теперь представьте ситуацию, когда поочередно выполняются и откладываются задачи #1 и #2.
Что если 1С заработает к моменту выполнения задачи #2? Задача выполнится и зальёт последние изменения. Далее пойдет в ход задача #1 и затрет собой стабильные изменения :)
Выход: отправляем timestamp в качестве version, и, если задача «из прошлого» — выкидываем её.
5) Идешь в асинхронность — прочитай про многие архитектурные проблемы, а также race condition, несогласованность консумеров на разных машинах и прочее.
6) Пишите версии вашим очередям… Ух как помогает на реальном проде. В принципе мы так и сделали в этом примере.
7) Возможно тебе не нужен RabbitMQ и целый AMQP протокол. Посмотри в сторону beanstalkd.
8) Запускайте консумеры и всякое прочее демоническое на php через supervisor и подключите полное логирование падения процессов в нём. У него так же есть web интерфейс для управления всем этим делом, что так же очень удобно. Проблемы будут всегда.
