Организация обработки асинхронных событий с Spring Events и Spring AMQP
Привет, Хабр!
Асинхронная обработка событий – один из базовых инструментов на сегодняшний день, позволяющий создавать масштабируемые и отзывчивые приложения. Сегодня мы рассмотрим два инструмента из Spring Framework – Spring Events и Spring AMQP, которые помогают управлять асинхронными задачами.
Spring Events
Spring Events – это встроенный механизм для публикации и обработки событий внутри приложения. Spring Events основаны на паттерне проектирования "издатель-подписчик".
Для работы с событиями потребуется три основных компонента:
Класс события: описывает само событие.
Издатель события: компонент, который генерирует и публикует события.
Подписчик на событие: компонент, который обрабатывает опубликованные события.
Начнем с создания класса события. Этот класс должен наследоваться от ApplicationEvent
:
public class CustomSpringEvent extends ApplicationEvent {
private String message;
public CustomSpringEvent(Object source, String message) {
super(source);
this.message = message;
}
public String getMessage() {
return message;
}
}
Для публикации события потребуется класс-издатель, который будет использовать ApplicationEventPublisher
для публикации событий:
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
@Component
public class CustomSpringEventPublisher implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
public void publishEvent(final String message) {
System.out.println("Publishing custom event. ");
CustomSpringEvent customSpringEvent = new CustomSpringEvent(this, message);
applicationEventPublisher.publishEvent(customSpringEvent);
}
}
Теперь создадим компонент, который будет обрабатывать события. Для этого можно использовать аннотацию @EventListener
:
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
public class CustomSpringEventListener {
@EventListener
public void handleCustomSpringEvent(CustomSpringEvent event) {
System.out.println("Received spring custom event - " + event.getMessage());
}
}
Чтобы события обрабатывались асинхронно нужно включить поддержку асинхронности в конфигурации Spring и пометить метод-обработчик аннотацией @Async
.
Для этого необходимо добавить аннотацию @EnableAsync
в конфигурационный класс:
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
@Configuration
@EnableAsync
public class AsyncConfig {
}
Теперь изменим слушатель событий, чтобы он обрабатывал события асинхронно:
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.context.event.EventListener;
@Component
public class CustomSpringEventListener {
@Async
@EventListener
public void handleCustomSpringEvent(CustomSpringEvent event) {
System.out.println("Received spring custom event - " + event.getMessage());
// доп. логика обработки события
}
}
Примеры использования
Кэширование
Предположим, есть приложение, которое кэширует данные. Когда данные изменяются, нужно обновить кэш. Можно юзать событие для уведомления всех компонентов о необходимости обновления кэша:
// событие изменения данных
public class DataChangeEvent extends ApplicationEvent {
private String dataId;
public DataChangeEvent(Object source, String dataId) {
super(source);
this.dataId = dataId;
}
public String getDataId() {
return dataId;
}
}
// публикация события изменения данных
@Component
public class DataService {
@Autowired
private ApplicationEventPublisher eventPublisher;
public void updateData(String dataId) {
// логика обновления данных
eventPublisher.publishEvent(new DataChangeEvent(this, dataId));
}
}
// обработчик события изменения данных
@Component
public class CacheService {
@EventListener
@Async
public void handleDataChangeEvent(DataChangeEvent event) {
System.out.println("Updating cache for dataId: " + event.getDataId());
// логика обновления кэша
}
}
Уведомление о завершении задач
Для этой задачи можно использовать событие для уведомления других компонентов о завершении длительных задач:
// событие завершения задачи
public class TaskCompleteEvent extends ApplicationEvent {
private String taskId;
public TaskCompleteEvent(Object source, String taskId) {
super(source);
this.taskId = taskId;
}
public String getTaskId() {
return taskId;
}
}
// публикация события завершения задачи
@Component
public class TaskService {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Async
public void executeTask(String taskId) {
// логика выполнения задачи
eventPublisher.publishEvent(new TaskCompleteEvent(this, taskId));
}
}
// обработчик события завершения задачи
@Component
public class NotificationService {
@EventListener
@Async
public void handleTaskCompleteEvent(TaskCompleteEvent event) {
System.out.println("Task completed: " + event.getTaskId());
// логика уведомления
}
}
Организация обработки сообщений с помощью Spring AMQP
AMQP — это протокол уровня приложений для мидлвэр-систем, предназначенных для передачи сообщений. Имеет функциональные возможности для межпрограммного взаимодействия и включает концепции очередей, маршрутизации, обменов и привязок.
RabbitMQ — это брокер сообщений, реализующий AMQP. Он позволяет обмениваться сообщениями и управлять их очередями.
Для старта работ с Spring AMQP в проекте Spring Boot необходимо добавить соответствующую зависимость в файл pom.xml
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Далее необходимо настроить параметры подключения к RabbitMQ в файле application.properties
.
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
Создание и отправка сообщений с помощью AmqpTemplate
Для отправки сообщений используется AmqpTemplate
.
Для начала создадим конфигурационный класс, который определит все необходимые бины:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
static final String queueName = "myQueue";
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
Теперь создадим компонент, который будет отправлять сообщения:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Sender {
private final RabbitTemplate rabbitTemplate;
@Autowired
public Sender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send(String message) {
rabbitTemplate.convertAndSend(RabbitConfig.queueName, message);
System.out.println("Sent: " + message);
}
}
Для обработки сообщений, поступающих в очередь, используется аннотация @RabbitListener
.
Добавим компонент, который будет обрабатывать входящие сообщения:
import org.springframework.stereotype.Component;
@Component
public class Receiver {
public void receiveMessage(String message) {
System.out.println("Received: " + message);
}
}
С аннтоацией @RabbitListener
можно декларативно указать методы, которые будут обрабатывать сообщения из очередей:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class MessageListener {
@RabbitListener(queues = RabbitConfig.queueName)
public void processMessage(String message) {
System.out.println("Received message: " + message);
}
}
Пример
Полный пример с отправкой и получением сообщений:
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
@EnableRabbit
public class SpringAmqpApplication {
public static void main(String[] args) {
SpringApplication.run(SpringAmqpApplication.class, args);
}
@Bean
CommandLineRunner runner(Sender sender) {
return args -> {
System.out.println("Sending message...");
sender.send("Hello, RabbitMQ!");
};
}
}
Примеры использования двух инструментов вместе
Обработка заказов с локальными событиями и асинхронной передачей
В этом примере система обрабатывает заказы локально, публикует событие о новом заказе с помощью Spring Events и затем отправляет заказ на дальнейшую обработку в другую службу через RabbitMQ:
// событие заказа
public class OrderCreatedEvent extends ApplicationEvent {
private String orderId;
public OrderCreatedEvent(Object source, String orderId) {
super(source);
this.orderId = orderId;
}
public String getOrderId() {
return orderId;
}
}
// публикация события заказа
@Component
public class OrderService {
@Autowired
private ApplicationEventPublisher eventPublisher;
public void createOrder(String orderId) {
// логика создания заказа
System.out.println("Order created: " + orderId);
eventPublisher.publishEvent(new OrderCreatedEvent(this, orderId));
}
}
// обработка локального события и отправка в RabbitMQ
@Component
public class OrderCreatedListener {
@Autowired
private AmqpTemplate amqpTemplate;
@EventListener
@Async
public void handleOrderCreatedEvent(OrderCreatedEvent event) {
System.out.println("Handling order created event for order: " + event.getOrderId());
// доп. логика обработки
amqpTemplate.convertAndSend("orderQueue", event.getOrderId());
}
}
// конфигурация RabbitMQ
@Configuration
public class RabbitConfig {
@Bean
public Queue orderQueue() {
return new Queue("orderQueue", false);
}
@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
Обновление состояния задач и уведомление через RabbitMQ
При обновлении состояния задачи генерируется локальное событие, которое затем отправляется в очередь RabbitMQ для уведомления других микросервисов о статусе задачи:
// событие обновления задачи
public class TaskStatusUpdateEvent extends ApplicationEvent {
private String taskId;
private String status;
public TaskStatusUpdateEvent(Object source, String taskId, String status) {
super(source);
this.taskId = taskId;
this.status = status;
}
public String getTaskId() {
return taskId;
}
public String getStatus() {
return status;
}
}
// публикация события обновления задачи
@Component
public class TaskService {
@Autowired
private ApplicationEventPublisher eventPublisher;
public void updateTaskStatus(String taskId, String status) {
System.out.println("Updating task status: " + taskId + " to " + status);
eventPublisher.publishEvent(new TaskStatusUpdateEvent(this, taskId, status));
}
}
// обработка события обновления задачи и отправка в RabbitMQ
@Component
public class TaskStatusUpdateListener {
@Autowired
private AmqpTemplate amqpTemplate;
@EventListener
@Async
public void handleTaskStatusUpdateEvent(TaskStatusUpdateEvent event) {
System.out.println("Sending task status update to RabbitMQ: " + event.getTaskId() + " - " + event.getStatus());
amqpTemplate.convertAndSend("taskStatusQueue", event.getTaskId() + " - " + event.getStatus());
}
}
// конфигурация RabbitMQ
@Configuration
public class RabbitConfig {
@Bean
public Queue taskStatusQueue() {
return new Queue("taskStatusQueue", false);
}
@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
В заключение напоминаю об открытых уроках, которые пройдут в рамках курса «Разработчик на Spring Framework» в Otus:
1 июля: Тестирование Spring приложений. Интеграционные тесты с контекстом. Тестирование слоя репозиториев и сервисов. Запись по ссылке
16 июля: Тестирование Spring приложений. Интеграционные тесты контроллеров, интеграций с внешними API и безопасности. Запись по ссылке