Пример использования RabbitMQ Delayed Message Exchange в Java Spring Framework

  • Tutorial
image В этом посте я хочу показать как использовать отложенные сообщения в RabbitMQ. В качестве примера задачи, где удобно использовать отложенную очередь, возьму механизм постбеков (s2s ping back, s2s pixel).

В двух словах о механизме постбеков:


1. Происходит некое событие
2. Ваше приложение должно уведомить об этом событии сторонний сервис
3. Если сторонний сервис оказался недоступен, то необходимо повторить уведомление еще раз через несколько минут

Для повторного уведомления я буду использовать отложенную очередь.

RabbitMQ по умолчанию не умеет задерживать сообщения, они доставляются сразу после публикации. Функционал отложенной доставки доступен в виде плагина rabbitmq-delayed-message-exchange.

Сразу хочу отметить, что плагин экспериментальный. Не смотря на то, что в целом он достаточно стабилен, использовать в продакшене его нужно на свой страх и риск.

Собираем Docker контейнер с RMQ и плагином


За основу я возьму официальный образ с management plugin, пригодится для тестирования.

Dockerfile:

FROM rabbitmq:3.6-management
RUN apt-get update && apt-get install -y curl
RUN curl http://www.rabbitmq.com/community-plugins/v3.6.x/rabbitmq_delayed_message_exchange-0.0.1.ez > $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-0.0.1.ez
RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

Сборка
docker build --tag=x25/rmq-delayed-message-exchange .

Запуск
docker run -d --name rmq -p 5672:5672 -p 15672:15672 x25/rmq-delayed-message-exchange

Spring AMQP


Spring Framework полностью поддерживает плагин в проекте spring-rabbit. Начиная с версии 1.6.4 можно пользоваться как xml/bean конфигурациями так и аннотациями.

Я буду использовать Spring Boot Amqp Starter.

Зависимость для Maven
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Зависимость для Gradle
compile "org.springframework.boot:spring-boot-starter-amqp"

Конфигурация через аннотации


При использовании бутстраппера и аннотаций Spring берет большую часть работы на себя. Достаточно лишь написать:

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME),
exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME))
public void onMessage(Message<?> message) {
    //...
}

И при запуске приложения RabbitAdmin автоматически объявит delayed exchange, queue, создаст обработчики событий и привяжет их к аннотированному методу.

Нужно больше потоков для обработки сообщений? Это настраивается через файл внешней конфигурации (свойство spring.rabbitmq.listener.concurrency) или через параметр containerFactory у аннотации:

Пример
//Создаем конфигурацию:
@Configuration
public class RabbitConfiguration {
    @Bean(name = "containerFactory")
    @Autowired
    public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(10);
        factory.setPrefetchCount(30);
        return factory;
    }
}

//Добавляем параметр:
@RabbitListener(containerFactory = "containerFactory", ...)

Для отправки отложенного сообщения удобно использовать RabbitTemplate:

rabbitTemplate.send(
        DELAY_EXCHANGE_NAME,
        DELAY_QUEUE_NAME,
        MessageBuilder
                .withBody(data)
                .setHeader("x-delay", MESSAGE_DELAY_MS).build()
);

Отправлено оно будет моментально, но доставлено будет с задержкой, указанной в заголовке x-delay. Максимально допустимое время задержки (2^32-1) мс.

Готовый пример приложения:

@SpringBootApplication
public class Application {

    private final Logger log = LoggerFactory.getLogger(Application.class);

    private final static String DELAY_QUEUE_NAME = "delayed.queue";
    private final static String DELAY_EXCHANGE_NAME = "delayed.exchange";

    private final static String DELAY_HEADER = "x-delay";
    private final static String NUM_ATTEMPT_HEADER = "x-num-attempt";
    private final static long   RETRY_BACKOFF = 5000;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = DELAY_QUEUE_NAME),
    exchange = @Exchange(value = DELAY_EXCHANGE_NAME, delayed = "true"), key = DELAY_QUEUE_NAME))
    public void onMessage(Message<byte[]> message) {

        String endpointUrl = new String(message.getPayload());
        Long numAttempt = (Long) message.getHeaders().getOrDefault(NUM_ATTEMPT_HEADER, 1L);

        log.info("Message received, url={}, attempt={}", endpointUrl, numAttempt);

        if (!doNotifyEndpoint(endpointUrl) && numAttempt < 3) {
            rabbitTemplate.send(
                    DELAY_EXCHANGE_NAME,
                    DELAY_QUEUE_NAME,
                    MessageBuilder
                            .withBody(message.getPayload())
                            .setHeader(DELAY_HEADER, numAttempt * RETRY_BACKOFF)
                            .setHeader(NUM_ATTEMPT_HEADER, numAttempt + 1)
                            .build()
            );
        }
    }

    private boolean doNotifyEndpoint(String url) {
        try {
            HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
            /* @todo: set up connection timeouts */
            return (connection.getResponseCode() == 200);
        } catch (Exception e) {
            log.error(e.getMessage());
            return false;
        }
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

application.yml
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      prefetch: 10
      concurrency: 50

build.gradle
buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.4.2.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'

jar {
    baseName = 'rmq-delayed-demo'
    version =  '0.1.0'
}

repositories {
    mavenCentral()
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
    compile("org.springframework.boot:spring-boot-starter-amqp")
    testCompile("org.springframework.boot:spring-boot-starter-test")
}

Проверяем отложенную доставку через панель управления (rmq-management), она доступна на порту 15672:

image

Логи:

2016-12-21 14:27:25.927: Message received, url=http://localhost:1234, attempt=1
2016-12-21 14:27:25.941: Connection refused (Connection refused)
2016-12-21 14:27:30.946: Message received, url=http://localhost:1234, attempt=2
2016-12-21 14:27:30.951: Connection refused (Connection refused)
2016-12-21 14:27:40.954: Message received, url=http://localhost:1234, attempt=3

Конфигурация через XML


При использовании XML конфигураций нужно просто установить у exchange-бина свойство delayed в true и RabbitAdmin сделает все остальное за вас.

Пример xml-конфигурации в связке с Integration Framework
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
  http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
  http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <int:channel id="to-delayed-rmq" />

    <int-amqp:outbound-channel-adapter channel="to-delayed-rmq"
                                       amqp-template="rabbitTemplate"
                                       exchange-name="delayed.exchange"
                                       routing-key="delayed.binding"
                                       mapped-request-headers="x-delay" />

    <int-amqp:inbound-channel-adapter channel="from-delayed-rmq-queue"
                                      queue-names="delayed.queue"
                                      message-converter="amqpMessageConverter"
                                      connection-factory="rabbitConnectionFactory"
                                      concurrent-consumers="10"
                                      prefetch-count="50" />

    <int:service-activator input-channel="from-delayed-rmq-queue" method="onMessage">
        <bean id="postbackServiceActivator" class="PostbackServiceActivator" />
    </int:service-activator>

    <rabbit:queue name="delayed.queue" />

    <rabbit:direct-exchange name="delayed.exchange" delayed="true">
        <rabbit:bindings>
            <rabbit:binding queue="delayed.queue" key="delayed.binding" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

</beans>

Полезные ссылки


AdBlock has stolen the banner, but banners are not teeth — they will be back

More
Ads

Comments 1

    +1
    Для отложенной доставки есть ещё один вариант, который работает на RabbitMQ даже без плагинов. Изначальное сообщение (с нужным TTL) отправляется в очередь на которую нет консюмеров, и когда заканчивается TTL, RabbitMQ перекладывает это сообщение в другой exchange, указанный как dead letter в метаданных сообщения. На этом принципе часто делается delayed retry для сообщений, которые не удалось обработать с первого раза.

    Подробнее про dead letter можно почитать тут: https://www.rabbitmq.com/dlx.html
    А тут есть пример кода: http://stackoverflow.com/a/14844559

    Only users with full accounts can post comments. Log in, please.