Pull to refresh

RabbitMQ Spring tutorial

Reading time21 min
Views109K
На сайте rabbitmq.com уже есть подробные примеры и клиент для java. Однако если в проекте уже используется спринг, то намного удобнее использовать библиотеку Spring AMQP. Эта статья содержит реализацию всех шести официальных примеров работы с RabbitMQ.

Сразу ссылка на проекты на GitHub.

Для примеров я буду использовать простейшее приложение на спринге. После того, как пользователь перейдёт по опредленной ссылке, в RabbitMQ будет посылаться сообщение которое будет отправляться в один из листенеров. Листенер в свою очередь будет просто выводить сообщение в лог. На хабре уже были переводы официальных туториалов на php и python, и я думаю многие уже знакомы с принципами работы rabbitmq, поэтому я сконцентрируюсь на работе именно с Spring AMQP.

Подготовка


Установка RabbitMQ


Установка RabbitMQ детально описана на официальном сайте. Тут проблем возникнуть не должно.

Настройка Spring


Для простоты я использовал Spring Boot. Он отлично подходит, чтобы быстро развернуть приложения на спринге и не заниматься его долгим кофигурированием. При этом сам Spring AMQP я буду конфигурировать «классическим способом» — т.е. так, как я конфигурировал в реальном проекте без Spring Boot (разве что в ConnectionFactory не описаны некоторые специфичные для heroku вещи).

Cодержимое минимального pom.xml необходимого нам для запуска. Здесь уже есть Spring boot и Spring AMQP.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>rabbitmq</groupId>
    <artifactId>example-1</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.2.4.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.5.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
</project>

Основной файл конфигурации. Кроме имени класса, его содержимое будет одинаковым для всех наших примеров.

package com.rabbitmq.example1;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Import;

@EnableAutoConfiguration
@ComponentScan
@Import(RabbitConfiguration.class)
public class Example1Configuration {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(Example1Configuration.class, args);
    }
}


Пример 1. «Hello World!»




Для работы с RabbitMQ нам потребуются следующие бины:
— сonnectionFactory — для соединения с RabbitMQ;
— rabbitAdmin — для регистрации/отмены регистрации очередей и т.п.;
— rabbitTemplate — для отправки сообщений (producer);
— myQueue1 — собственно очередь куда посылаем сообщения;
— messageListenerContainer — принимает сообщения (consumer).

Код конфигурации для этих бинов
package com.rabbitmq.example1;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;

public class RabbitConfiguration {
    Logger logger = Logger.getLogger(RabbitConfiguration.class);

    //настраиваем соединение с RabbitMQ
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory("localhost");
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    //объявляем очередь с именем queue1
    @Bean
    public Queue myQueue1() {
        return new Queue("queue1");
    }

    //объявляем контейнер, который будет содержать листенер для сообщений
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer1() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueueNames("queue1");
        container.setMessageListener(new MessageListener() {
        	//тут ловим сообщения из queue1
            public void onMessage(Message message) {
                logger.info("received from queue1 : " + new String(message.getBody()));
            }
        });
        return container;
    }
}


В этом и следующих примерах в качестве продюссера будет контроллер, который будет посылать сообщения в rabbitmq.

package com.rabbitmq.example1;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class SampleController {
    Logger logger = Logger.getLogger(SampleController.class);

    @Autowired
    AmqpTemplate template;

    @RequestMapping("/emit")
    @ResponseBody
    String queue1() {
        logger.info("Emit to queue1");
        template.convertAndSend("queue1","Message to queue");
        return "Emit to queue";
    }
}

Теперь, если запустить Example1Configuration и перейти в браузере по адресу http://localhost:8080/emit, то в консоли мы увидим что-то типа:

2015-06-23 21:16:26.250  INFO 6460 --- [nio-8080-exec-2] com.rabbitmq.example1.SampleController   : Emit to queue1
2015-06-23 21:16:26.252  INFO 6460 --- [cTaskExecutor-1] c.rabbitmq.example1.RabbitConfiguration  : received from queue 1: Message to queue


Рассмотрим подробнее получившийся результат. Тут мы в SampleController.java отправляем сообщение:

template.convertAndSend("queue1","Message to queue");

А здесь мы его получаем:

public void onMessage(Message message) {
    logger.info("received from queue 1: " + new String(message.getBody()));
}

Ничего сложного, но описывать все листенеры в конфигурации не совсем удобно, особенно когда их становится много. Гораздо удобнее конфигуририровать листенеры аннотациями.

Пример 1.1. «Hello World!» на аннотациях


Вместо листенера в конфигурации добавим в проект класс RabbitMqListener, в который будут приходить сообщения. Соответственно messageListenerContainer1 уже не нужен.

RabbitMqListener — это обыкновенный компонент(@Component) спринга с методом, помеченным анотацией @RabbitListener. В этом метод будут приходить сообщения. При этом мы можем получать как полное сообщение Message заголовками и телом как массив байт, так и просто сконвертированное тело в том виде, в каком мы его отправляли.

    @RabbitListener(queues = "queue1")
    public void processQueue1(String message) {
        logger.info("Received from queue 1: " + message);
    }

Исходный код RabbitMqListener.java и обновленного RabbitConfiguration.java
package com.rabbitmq.example1annotated;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@EnableRabbit //нужно для активации обработки аннотаций @RabbitListener
@Component
public class RabbitMqListener {
    Logger logger = Logger.getLogger(RabbitMqListener.class);

    @RabbitListener(queues = "queue1")
    public void processQueue1(String message) {
        logger.info("Received from queue 1: " + message);
    }
}


package com.rabbitmq.example1annotated;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfiguration {
    Logger logger = Logger.getLogger(RabbitConfiguration.class);
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory("localhost");
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    public Queue myQueue1() {
        return new Queue("queue1");
    }

}


Пример 2. Work Queues




В данном примере одну очередь слушают уже два листенера. Для эмуляции полезной работы используем Thread.sleep. Важно, что листенеры одной очереди могут быть и на разных инстансах программы. Так можно распараллелить очередь на несколько компьютеров или нод в облаке.

    @RabbitListener(queues = "query-example-2")
    public void worker1(String message) throws InterruptedException {
        logger.info("worker 1 : " + message);
        Thread.sleep(100 * random.nextInt(20));
    }

    @RabbitListener(queues = "query-example-2")
    public void worker2(String message) throws InterruptedException {
        logger.info("worker 2 : " + message);
        Thread.sleep(100 * random.nextInt(20));
    }

Результат:
2015-06-23 22:03:48.018  INFO 6784 --- [nio-8080-exec-1] com.rabbitmq.example2.SampleController   : Emit to queue
2015-06-23 22:03:48.029  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 2 : Message 1
2015-06-23 22:03:48.029  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 1 : Message 0
2015-06-23 22:03:48.830  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 2 : Message 2
2015-06-23 22:03:49.331  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 2 : Message 3
2015-06-23 22:03:49.432  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 2 : Message 4
2015-06-23 22:03:49.634  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 1 : Message 5
2015-06-23 22:03:49.733  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 2 : Message 6
2015-06-23 22:03:49.735  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 1 : Message 7
2015-06-23 22:03:50.236  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 1 : Message 8
2015-06-23 22:03:50.537  INFO 6784 --- [cTaskExecutor-1] com.rabbitmq.example2.RabbitMqListener   : worker 1 : Message 9


Исходный код RabbitMqListener.java и обновленного SampleController.java
package com.rabbitmq.example2;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Random;

@Component
public class RabbitMqListener {
    Logger logger = Logger.getLogger(RabbitMqListener.class);
    Random random = new Random();

    @RabbitListener(queues = "query-example-2")
    public void worker1(String message) throws InterruptedException {
        logger.info("worker 1 : " + message);
        Thread.sleep(100 * random.nextInt(20));
    }

    @RabbitListener(queues = "query-example-2")
    public void worker2(String message) throws InterruptedException {
        logger.info("worker 2 : " + message);
        Thread.sleep(100 * random.nextInt(20));
    }

}	

package com.rabbitmq.example2;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;


@Controller
public class SampleController {
    Logger logger = Logger.getLogger(SampleController.class);

    @Autowired
    AmqpTemplate template;

    @RequestMapping("/queue")
    @ResponseBody
    String queue1() {
        logger.info("Emit to queue");
        for(int i = 0;i<10;i++)
            template.convertAndSend("query-example-2","Message " + i);
        return "Emit to queue";
    }
}


Пример 3. Publish/Subscribe




Тут одно и то же сообщение приходит сразу двум консьюмерам.

2015-06-23 22:12:24.669  INFO 1664 --- [nio-8080-exec-1] com.rabbitmq.example3.SampleController   : Emit to exchange-example-3
2015-06-23 22:12:24.684  INFO 1664 --- [cTaskExecutor-1] com.rabbitmq.example3.RabbitMqListener   : accepted on worker 1 : Fanout message
2015-06-23 22:12:24.684  INFO 1664 --- [cTaskExecutor-1] com.rabbitmq.example3.RabbitMqListener   : accepted on worker 2 : Fanout message

Для этого, подключим обе очереди к FanoutExchange:

    @Bean
    public FanoutExchange fanoutExchangeA(){
        return new FanoutExchange("exchange-example-3");
    }

    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(myQueue1()).to(fanoutExchangeA());
    }

    @Bean
    public Binding binding2(){
        return BindingBuilder.bind(myQueue2()).to(fanoutExchangeA());
    }

И будем отправлять не в очередь, а в exchange exchange-example-3:

    template.setExchange("exchange-example-3");
    template.convertAndSend("Fanout message");

Каждый раз указывать exchange необязательно. Его можно указать и один раз при создании RabbitTemplate.

Полные исходные коды
package com.rabbitmq.example3;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableRabbit
@Configuration
public class RabbitConfiguration {
    Logger logger = Logger.getLogger(RabbitConfiguration.class);

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory("localhost");
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }


    @Bean
    public Queue myQueue1() {
        return new Queue("query-example-3-1");
    }

    @Bean
    public Queue myQueue2() {
        return new Queue("query-example-3-2");
    }

    @Bean
    public FanoutExchange fanoutExchangeA(){
        return new FanoutExchange("exchange-example-3");
    }

    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(myQueue1()).to(fanoutExchangeA());
    }

    @Bean
    public Binding binding2(){
        return BindingBuilder.bind(myQueue2()).to(fanoutExchangeA());
    }

}

package com.rabbitmq.example3;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Random;

@Component
public class RabbitMqListener {
    Logger logger = Logger.getLogger(RabbitMqListener.class);
    Random random = new Random();

    @RabbitListener(queues = "query-example-3-1")
    public void worker1(String message) {
        logger.info("accepted on worker 1 : " + message);
    }

    @RabbitListener(queues = "query-example-3-2")
    public void worker2(String message) {
        logger.info("accepted on worker 2 : " + message);
    }

}

package com.rabbitmq.example3;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;


@Controller
public class SampleController {
    Logger logger = Logger.getLogger(SampleController.class);

    @Autowired
    RabbitTemplate template;

    @RequestMapping("/")
    @ResponseBody
    String home() {
        return "Empty mapping";
    }

    @RequestMapping("/emit")
    @ResponseBody
    String emit() {
        logger.info("Emit to exchange-example-3");
        template.setExchange("exchange-example-3");
        template.convertAndSend("Fanout message");
        return "Emit to exchange-example-3";
    }
}



Пример 4. Routing




Здесь используется routing key, в зависимости от которого сообщение может попасть в одну из очередей или сразу в обе. Для этого вместо FanoutExchange используем DirectExchange:

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("exchange-example-4");
    }

    @Bean
    public Binding errorBinding1(){
        return BindingBuilder.bind(myQueue1()).to(directExchange()).with("error");
    }

    @Bean
    public Binding errorBinding2(){
        return BindingBuilder.bind(myQueue2()).to(directExchange()).with("error");
    }

    @Bean
    public Binding infoBinding(){
        return BindingBuilder.bind(myQueue2()).to(directExchange()).with("info");
    }

    @Bean
    public Binding warningBinding(){
        return BindingBuilder.bind(myQueue2()).to(directExchange()).with("warning");
    }


И при отправке используем указываем Routing key, например, так:

    template.convertAndSend("info", "Info");

В результате получаем:

2015-06-23 22:29:24.480  INFO 5820 --- [nio-8080-exec-2] com.rabbitmq.example4.SampleController   : Emit as info
2015-06-23 22:29:24.483  INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener   : accepted on worker 2 : Info
2015-06-23 22:29:29.721  INFO 5820 --- [nio-8080-exec-4] com.rabbitmq.example4.SampleController   : Emit as error
2015-06-23 22:29:29.727  INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener   : accepted on worker 2 : Error
2015-06-23 22:29:29.731  INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener   : accepted on worker 1 : Error
2015-06-23 22:29:36.779  INFO 5820 --- [nio-8080-exec-5] com.rabbitmq.example4.SampleController   : Emit as warning
2015-06-23 22:29:36.781  INFO 5820 --- [cTaskExecutor-1] com.rabbitmq.example4.RabbitMqListener   : accepted on worker 2 : Warning


Полные исходные коды
package com.rabbitmq.example4;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableRabbit
@Configuration
public class RabbitConfiguration {
    Logger logger = Logger.getLogger(RabbitConfiguration.class);

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory("localhost");
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setExchange("exchange-example-4");
        return rabbitTemplate;
    }

    @Bean
    public Queue myQueue1() {
        return new Queue("query-example-4-1");
    }

    @Bean
    public Queue myQueue2() {
        return new Queue("query-example-4-2");
    }

    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("exchange-example-4");
    }

    @Bean
    public Binding errorBinding1(){
        return BindingBuilder.bind(myQueue1()).to(directExchange()).with("error");
    }

    @Bean
    public Binding errorBinding2(){
        return BindingBuilder.bind(myQueue2()).to(directExchange()).with("error");
    }

    @Bean
    public Binding infoBinding(){
        return BindingBuilder.bind(myQueue2()).to(directExchange()).with("info");
    }

    @Bean
    public Binding warningBinding(){
        return BindingBuilder.bind(myQueue2()).to(directExchange()).with("warning");
    }

}

package com.rabbitmq.example4;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Random;

@Component
public class RabbitMqListener {
    Logger logger = Logger.getLogger(RabbitMqListener.class);
    Random random = new Random();

    @RabbitListener(queues = "query-example-4-1")
    public void worker1(String message) {
        logger.info("accepted on worker 1 : " + message);
    }

    @RabbitListener(queues = "query-example-4-2")
    public void worker2(String message) {
        logger.info("accepted on worker 2 : " + message);
    }

}

package com.rabbitmq.example4;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class SampleController {
    Logger logger = Logger.getLogger(SampleController.class);

    @Autowired
    RabbitTemplate template;

    @RequestMapping("/")
    @ResponseBody
    String home() {
        return "Empty mapping";
    }

    @RequestMapping("/emit/error")
    @ResponseBody
    String error() {
        logger.info("Emit as error");
        template.convertAndSend("error", "Error");
        return "Emit as error";
    }

    @RequestMapping("/emit/info")
    @ResponseBody
    String info() {
        logger.info("Emit as info");
        template.convertAndSend("info", "Info");
        return "Emit as info";
    }

    @RequestMapping("/emit/warning")
    @ResponseBody
    String warning() {
        logger.info("Emit as warning");
        template.convertAndSend("warning", "Warning");
        return "Emit as warning";
    }
}


Пример 5. Topics




Здесь вместо DirectExchange используем TopicExchange
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("exchange-example-5");
    }

    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(myQueue1()).to(topicExchange()).with("*.orange.*");
    }

    @Bean
    public Binding binding2(){
        return BindingBuilder.bind(myQueue2()).to(topicExchange()).with("*.*.rabbit");
    }

    @Bean
    public Binding binding3(){
        return BindingBuilder.bind(myQueue2()).to(topicExchange()).with("lazy.#");
    }

В результате получаем:

2015-06-23 22:42:28.414  INFO 6560 --- [nio-8080-exec-1] com.rabbitmq.example5.SampleController   : Emit 'to 1 and 2' to 'quick.orange.rabbit'
2015-06-23 22:42:28.428  INFO 6560 --- [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener   : accepted on worker 2 : to 1 and 2
2015-06-23 22:42:28.428  INFO 6560 --- [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener   : accepted on worker 1 : to 1 and 2
2015-06-23 22:42:55.802  INFO 6560 --- [nio-8080-exec-2] com.rabbitmq.example5.SampleController   : Emit 'to 2' to 'lazy.black.cat'
2015-06-23 22:42:55.805  INFO 6560 --- [cTaskExecutor-1] com.rabbitmq.example5.RabbitMqListener   : accepted on worker 2 : to 2


Полные исходные коды
package com.rabbitmq.example5;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;


@Controller
public class SampleController {
    Logger logger = Logger.getLogger(SampleController.class);

    @Autowired
    RabbitTemplate template;

    @RequestMapping("/")
    @ResponseBody
    String home() {
        return "Empty mapping";
    }

    @RequestMapping("/emit/{key}/{message}")
    @ResponseBody
    String error(@PathVariable("key") String key, @PathVariable("message") String message) {
        logger.info(String.format("Emit '%s' to '%s'",message,key));
        template.convertAndSend(key, message);
        return String.format("Emit '%s' to '%s'",message,key);
    }
}

package com.rabbitmq.example5;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Random;

@Component
public class RabbitMqListener {
    Logger logger = Logger.getLogger(RabbitMqListener.class);
    Random random = new Random();

    @RabbitListener(queues = "query-example-5-1")
    public void worker1(String message) {
        logger.info("accepted on worker 1 : " + message);
    }

    @RabbitListener(queues = "query-example-5-2")
    public void worker2(String message) {
        logger.info("accepted on worker 2 : " + message);
    }

}

package com.rabbitmq.example5;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;


@Controller
public class SampleController {
    Logger logger = Logger.getLogger(SampleController.class);

    @Autowired
    RabbitTemplate template;

    @RequestMapping("/")
    @ResponseBody
    String home() {
        return "Empty mapping";
    }

    @RequestMapping("/emit/{key}/{message}")
    @ResponseBody
    String error(@PathVariable("key") String key, @PathVariable("message") String message) {
        logger.info(String.format("Emit '%s' to '%s'",message,key));
        template.convertAndSend(key, message);
        return String.format("Emit '%s' to '%s'",message,key);
    }
}



Пример 6. Remote procedure call (RPC)




Spring AMQP позволяет использовать convertSendAndReceive, чтобы получить ответ на отправленное сообщение. При этом, при дефолтной настройке, в случае если у нас RabbitMQ версии до 3.4.0, то для ответного сообщения будет создана временная очередь. Этот способ не очень производительный, поэтому его использовать не рукомендуется и следует создать самому также и очередь для обратных сообщений и указать её как ReplyQueue у RabbitTemplate. Если же у нас RabbitMQ версии 3.4.0 и выше, то будет использован механизм Direct reply-to, который намного быстрее. Подробнее в документации по Spring AMQP.

Таким образом осуществить удаленный вызов процедуры можно всего одной строкой:

    String response = (String) template.convertSendAndReceive("query-example-6",message);

А так процедура обрабатывается на консьюмере:

    @RabbitListener(queues = "query-example-6")
    public String worker1(String message) throws InterruptedException {
        logger.info("received on worker : " + message);
        Thread.sleep(3000); //эмулируем полезную работу
        return "received on worker : " + message;
    }

В результате получаем:

2015-06-23 23:12:36.677  INFO 6536 --- [nio-8080-exec-5] com.rabbitmq.example6.SampleController   : Emit 'Hello world'
2015-06-23 23:12:36.679  INFO 6536 --- [cTaskExecutor-1] com.rabbitmq.example6.RabbitMqListener   : Received on worker : Hello world
2015-06-23 23:12:39.681  INFO 6536 --- [nio-8080-exec-5] com.rabbitmq.example6.SampleController   : Received on producer 'Received on worker : Hello world'


Полные исходные коды
package com.rabbitmq.example6;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableRabbit
@Configuration
public class RabbitConfiguration {
    Logger logger = Logger.getLogger(RabbitConfiguration.class);

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory("localhost");
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setQueue("query-example-6");
        rabbitTemplate.setReplyTimeout(60 * 1000);
        //no reply to - we use direct-reply-to
        return rabbitTemplate;
    }

    @Bean
    public Queue myQueue() {
        return new Queue("query-example-6");
    }
}

package com.rabbitmq.example6;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Random;

@Component
public class RabbitMqListener {
    Logger logger = Logger.getLogger(RabbitMqListener.class);

    @RabbitListener(queues = "query-example-6")
    public String worker1(String message) throws InterruptedException {
        logger.info("Received on worker : " + message);
        Thread.sleep(3000);
        return "Received on worker : " + message;
    }
}

package com.rabbitmq.example6;

import org.apache.log4j.Logger;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;


@Controller
public class SampleController {
    Logger logger = Logger.getLogger(SampleController.class);

    @Autowired
    RabbitTemplate template;

    @RequestMapping("/")
    @ResponseBody
    String home() {
        return "Empty mapping";
    }

    @RequestMapping("/process/{message}")
    @ResponseBody
    String error(@PathVariable("message") String message) {
        logger.info(String.format("Emit '%s'",message));
        String response = (String) template.convertSendAndReceive("query-example-6",message);
        logger.info(String.format("Received on producer '%s'",response));
        return String.valueOf("returned from worker : " + response);
    }
}


Заключение


У себя я использовал RabbitMQ в проекте в облачном хостинге heroku. Использовать RabbitMQ в heroku довольно просто — достаточно подключить одного из провайдеров RabbitMQ в консоли администрирования и тогда в переменных окружения появится адрес для доступа к кролику. Этот адрес нужно использовать при создании connectionFactory.

	@Bean
	public ConnectionFactory connectionFactory()
	{
		//получаем адрес AMQP у провайдера
		String uri = System.getenv("CLOUDAMQP_URL");
		if (uri == null) //значит мы запущены локально и нужно подключаться к локальному rabbitmq
			uri = "amqp://guest:guest@localhost";
		URI url = null;
		try
		{
			url = new URI(uri);
		} catch (URISyntaxException e)
		{
			e.printStackTrace(); //тут ошибка крайне маловероятна
		}

		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
		connectionFactory.setHost(url.getHost());
		connectionFactory.setUsername(url.getUserInfo().split(":")[0]);
		connectionFactory.setPassword(url.getUserInfo().split(":")[1]);
		if (StringUtils.isNotBlank(url.getPath()))
			connectionFactory.setVirtualHost(url.getPath().replace("/", ""));
		connectionFactory.setConnectionTimeout(3000);
		connectionFactory.setRequestedHeartBeat(30);
		return connectionFactory;
	}

В остальном код мало отличается от приведенного в примере 4(Routing).

Использованные источники


Страница проекта Spring AMQP
Страница проекта Spring Boot
Страница с примерами RabbitMQ
Tags:
Hubs:
Total votes 11: ↑10 and ↓1+9
Comments0

Articles