Pull to refresh
890.79
OTUS
Цифровые навыки от ведущих экспертов

Простая интеграция RabbitMQ и Spring Boot

Reading time6 min
Views20K
Original author: Júlio Falbo
Перевод статьи был подготовлен в преддверии старта курса «Разработчик на Spring Framework».




Всем привет!

Я хотел бы поделиться с вами библиотекой с открытым исходным кодом, которая облегчает интеграцию RabbitMQ с приложениями на Spring Boot. Кроме того, эта библиотека предлагает новую, улучшенную концепцию повторов неудачных операций (по сравнению со стандартным подходом Spring AMQP).
Упрощает? Но как?


Автоконфигурация


Допустим, мы хотим реализовать взаимодействие (например, с использованием SSL) с RabbitMQ с помощью Spring AMQP. Нам нужно создать несколько бинов, таких как ConnectionFactory, RabbitAdmin, RabbitTemplate и AbstractRabbitListenerContainerFactory.

Но представьте, что в RabbitMQ несколько виртуальных хостов. По умолчанию в Spring AMQP такую экосистему нужно создать для каждого виртуального хоста с ручной настройкой бинов RabbitTemplate и RabbitListener.



Как вы видите, вся эта конфигурация требует времени и может стать головной болью.

Осознавая эту проблему, предлагаемая библиотека автоматически настраивает все эти бины за вас. Единственное, что вам нужно сделать, это определить конфигурацию в application.properties, и произойдет магия!
Ух ты, но как выглядит конфигурация в application.properties?

Все очень просто. Во-первых, есть префикс для свойств — это spring.rabbitmq.custom.

После этого необходимо указать имя события. Эта очень важная часть, так как все настройки, которые делает библиотека, основаны на этом имени.

После имени события вы можете указать свойство и значение. Все свойства описаны в документации.

Итак, шаблон выглядит следующим образом:

spring.rabbitmq.custom.<YOUR_EVENT>.<PROPERTY>=<VALUE>


Ниже приведен пример конфигурации для двух разных соединений.

spring.rabbitmq.custom.some-event.host=localhost
spring.rabbitmq.custom.some-event.port=5672
spring.rabbitmq.custom.some-event.ttlRetryMessage=5000
spring.rabbitmq.custom.some-event.maxRetriesAttempts=5
spring.rabbitmq.custom.some-event.ttlMultiply=2
spring.rabbitmq.custom.some-event.queueRoutingKey=ROUTING.KEY.TEST
spring.rabbitmq.custom.some-event.exchange=ex.custom.direct
spring.rabbitmq.custom.some-event.exchangeType=direct
spring.rabbitmq.custom.some-event.queue=queue.custom.test
spring.rabbitmq.custom.some-event.autoCreate=true
spring.rabbitmq.custom.some-event.concurrentConsumers=1
spring.rabbitmq.custom.some-event.maxConcurrentConsumers=1
spring.rabbitmq.custom.some-event.virtualHost=tradeshift
spring.rabbitmq.custom.some-event.primary=true
spring.rabbitmq.custom.some-event.sslConnection=true
spring.rabbitmq.custom.some-event.tlsKeystoreLocation=file:///etc/tradeshift/your-service/tls-keystore.pkcs12
spring.rabbitmq.custom.some-event.tlsKeystorePassword=${RABBITMQ_PASS_CERT}

spring.rabbitmq.custom.another-event.host=localhost
spring.rabbitmq.custom.another-event.port=5672
spring.rabbitmq.custom.another-event.ttlRetryMessage=5000
spring.rabbitmq.custom.another-event.maxRetriesAttempts=5
spring.rabbitmq.custom.another-event.queueRoutingKey=TEST.QUEUE
spring.rabbitmq.custom.another-event.exchange=ex_test_1
spring.rabbitmq.custom.another-event.exchangeType=direct
spring.rabbitmq.custom.another-event.queue=queue_test_1
spring.rabbitmq.custom.another-event.autoCreate=true
spring.rabbitmq.custom.another-event.concurrentConsumers=1
spring.rabbitmq.custom.another-event.maxConcurrentConsumers=1
spring.rabbitmq.custom.another-event.username=guest
spring.rabbitmq.custom.another-event.password=${RABBITMQ_PASS}

Как видите, мы не написали ни одной строчки кода!!!
Хорошо, но вы что-то говорили о новой стратегии повторов, верно?

Да, мой друг, говорил!

Но прежде чем объяснить эту новую стратегию, давайте посмотрим на RabbitMQ и поведение Spring AMQP по умолчанию.

По умолчанию RabbitMQ не предоставляет обработку повторов, которая бы позволила контролировать весь жизненный цикл сообщения.

Например, до RabbitMQ 3.8 в заголовке сообщений не было свойства для контроля количества сделанных повторных попыток.

Поведение RabbitMQ по умолчанию:

  • Если вы не определили time-to-live (TTL, время жизни), то RabbitMQ постоянно будет пытаться поместить ваше сообщение в очередь.
  • Если вы определили TTL, но не определили dlx, то после TTL сообщение будет удалено из очереди, и вы его потеряете.
  • Если вы определили TTL и dlx, то после TTL сообщение будет отправлено в обменник, определенный в dlx.


Поведение RabbitMQ по умолчанию

Но что делать, если мы хотим увеличивающийся TTL (например, в случае нестабильной работы) и контроль количества повторных попыток?

Отличный вопрос!

Пришло время объяснить как работает Spring AQMP и библиотека Spring Rabbit Tuning!

Стратегии повтора


Spring AMQP по умолчанию


При использовании Spring AMQP по умолчанию вы можете определить параметры повторных попыток, используя свойства, приведенные ниже.

Но у этого подхода есть проблемы. По умолчанию Spring AMQP будет блокировать вашу очередь при повторной попытке доставить сообщение.

Эту проблему можно решить обходным путем: использовать параллелизм. Но таким образом мы нагружаем JVM, и это не самый лучший подход. Если у вас будет пять проблемных сообщений (как в примере), то снова возникнет узкое место, и нам все еще нужно вручную определять бины в @Configuration – бине для каждого соединения и контейнера.

spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=2000
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.multiplier=2
spring.rabbitmq.listener.simple.max-concurrency=5
spring.rabbitmq.listener.simple.concurrency=5


Spring RabbitMQ Tuning


Эта библиотека использует иной подход с помощью отдельной очереди для повторных попыток. Таким образом мы можем контролировать TTL с помощью параметра сообщения expiration, и параметра x-death для управления количеством повторных попыток.
Но как?

Мы используем концепцию dlx в очереди повторных попыток для повторной отправки сообщения в основную очередь. Таким образом, у нас есть доступ к параметру x-death и мы можем программно определять срок жизни сообщения.

Примечание. Так как эта библиотека является расширением Spring AMQP, то вы можете использовать дефолтную стратегию повторов, а эту библиотеку использовать только для автоматической конфигурации бинов.


Spring RabbitMQ Tuning Retry
Можно ли использовать вашу стратегию повторов без автоконфигурации? У меня уже есть много бинов, и нет времени, чтобы их переписывать.

Мы знаем, что мир не идеален, и, для данной ситуации у нас есть флаг, который позволяет отключить автоконфигурацию и использовать только наш подход к обработке повторов и другие преимущества, такие как управление бинами.

Вы можете отключить автоконфигурацию, используя следующее свойство:

spring.rabbitmq.enable.custom.autoconfiguration=false

Но у меня два разных соединения, как мне быть в этом случае?

Если у вас больше одного соединения и вы хотите отключить автоконфигурацию, то вам нужно указать имя бина RabbitTemplate для каждого соединения. Это описано здесь.

spring.rabbitmq.custom.<YOUR_EVENT>.rabbitTemplateBeanName= <RABBITTEMPLATE_BEAN_NAME>

Вы все еще можете использовать наш бин RabbitTemplateHandler для облегчения работы с Spring AMQP.
Очень хорошо! Итак, я хочу это использовать. Как я могу это сделать? У вас есть пример или документация?
Да!

У нас есть пример проекта в этом репозитории, где вы можете посмотреть, как использовать эту библиотеку для издателей и подписчиков.

Но это так просто, что я покажу пример здесь!

Издатель


В библиотеке есть класс RabbitTemplateHandler и он очень прост в использовании. Нужно вызвать метод getRabbitTemplate и передать виртуальный хост в качестве параметра для получения бина RabbitTemplate. После этого можно вызвать метод convertAndSend и передать в параметрах exchange и routing key.

Примечание: exchange и routing key можно получить с помощью аннотации Value.

Пример:

@Value("${spring.rabbitmq.custom.some-event.exchange}")
private String exchangeSomeEvent;

@Value("${spring.rabbitmq.custom.some-event.queueRoutingKey}")
private String routingKeySomeEvent;

@Autowired
private final RabbitTemplateHandler rabbitTemplateHandler;

public void sendMessage(final String message) {
   rabbitTemplateHandler.getRabbitTemplate("some-event").convertAndSend(exchangeSomeEvent, routingKeySomeEvent, message);
}

Подписчик


Подписчики тоже очень простые. Единственное, что вам нужно сделать, это добавить к методу аннотацию RabbitListener (аннотация Spring AMQP по умолчанию) и передавать имя containerFactory.
Как узнать правильное имя containerFactory для виртуального хоста?

Вам не нужно его знать, просто передайте имя события, а всю магию библиотека сделает за вас! В этой библиотеке также есть возможность включения повторов и dlq, которые рекомендуется использовать.

Включить их очень просто. Нужно добавить к методу аннотацию EnableRabbitRetryAndDlq и передавать имя свойства в качестве аргумента.

Примечание: вы также можете указать, какие исключения обрабатывать при повторах и dlq. По умолчанию обрабатывается Exception.class, что означает обработку всех исключений.

Пример:

@RabbitListener(containerFactory = "some-event", queues = "${spring.rabbitmq.custom.some-event.queue}")
@EnableRabbitRetryAndDlq(event = "some-event", exceptions = { IllegalArgumentException.class, RuntimeException.class })
public void onMessage(Message message) {
   ...
}

Это все! Надеюсь, вам понравится эта библиотека так же, как и нам!

И, самое главное: не стесняйтесь вносить свой вклад или отправлять отзывы!

Особая благодарность


Andre Luis Gomes
Rogerio Bueno
Leonardo Ferreira



Записаться на бесплатный урок.

Tags:
Hubs:
Total votes 7: ↑6 and ↓1+5
Comments0

Articles

Information

Website
otus.ru
Registered
Founded
Employees
101–200 employees
Location
Россия
Representative
OTUS