В этой статье разберемся, как реализовать обмен сообщениями между Java-микросервисами на Spring с помощью Kafka.
1. Архитектура
У нас будет Producer-микросервис ("писатель"), который получает заказы на еду (Food Order) и передает их через Kafka в Consumer-микросервис ("читатель") для сохранения в базу данных.
2. Пара слов о Kafka
Кластер Kafka обладает высокой масштабируемостью и отказоустойчивостью: при поломке одного из узлов, другие узлы берут на себя его работу, обеспечивая непрерывность работы без потери данных.
Чтение и запись данных в Kafka выполняется в виде событий, содержащих информацию в различном формате, например, в виде строки, массива или JSON-объекта.
Producer (производитель, издатель) публикует (записывает) события в Kafka, а Consumer (потребитель, подписчик) подписывается на эти события и обрабатывает их.
3. Топики
События группируются в топики (topic). Топик похож на папку, а события — на файлы в этой папке. У топика может быть ноль, один или много издателей и подписчиков.
События можно прочитать столько раз, сколько необходимо. В этом отличие Kafka от традиционных систем обмена сообщениями: после чтения события не удаляются. Можно настроить, как долго Kafka хранит события.
4. Разделы
Топики поделены на разделы (partition). Публикация события в топике фактически означает добавление его к одному из разделов. События с одинаковыми ключами записываются в один раздел. В рамках раздела Kafka гарантирует порядок событий.
Для отказоустойчивости и высокой доступности топик может быть реплицирован, в том числе между различными, географически удаленными, датацентрами. То есть всегда будет несколько брокеров с копиями данных на случай, если что-то пойдет не так.
5. Создание проектов
Перейдите на start.spring.io и создайте проекты с зависимостями, показанными на рисунках ниже.
Producer-микросервис:
Consumer-микросервис:
6. Запуск Kafka в докере
В корне одного из проектов, неважно каком, создайте файл docker-compose.yml
, содержащий параметры запуска Kafka, Kafdrop и Zookeeper в докере.
version: "3.7"
networks:
kafka-net:
name: kafka-net
driver: bridge
services:
zookeeper:
image: zookeeper:3.7.0
container_name: zookeeper
restart: "no"
networks:
- kafka-net
ports:
- "2181:2181"
kafka:
image: obsidiandynamics/kafka
container_name: kafka
restart: "no"
networks:
- kafka-net
ports:
- "9092:9092"
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER_INTERNAL:PLAINTEXT,DOCKER_EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: DOCKER_INTERNAL://:29092,DOCKER_EXTERNAL://:9092
KAFKA_ADVERTISED_LISTENERS: DOCKER_INTERNAL://kafka:29092,DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
kafdrop:
image: obsidiandynamics/kafdrop
container_name: kafdrop
restart: "no"
networks:
- kafka-net
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka:29092"
depends_on:
- "kafka"
Далее, находясь в папке с docker-compose.yml
выполните docker-compose up
. После запуска контейнеров откройте Kafdrop (веб-интерфейс для управления Kafka) по адресу http://localhost:9000.
В Kafdrop можно смотреть топики, создавать их, удалять и делать многое другое.
7. Producer-микросервис
Архитектура:
Этапы создания Producer-микросервиса:
создаем конфигурационные бины;
создаем топик для заказов;
создаем контроллер FoodOrderController, сервис FoodOrderService и Producer;
преобразуем заказы FoodOrder в текстовый вид для отправки брокеру.
Переменные окружения и порт для нашего API (application.yml):
server:
port: 8080
topic:
name: t.food.order
Config
отвечает за создание топика и бина KafkaTemplate
, используемого для отправки сообщения.
@Configuration
public class Config {
private final KafkaProperties kafkaProperties;
@Autowired
public Config(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
// get configs on application.properties/yml
Map<String, Object> properties = kafkaProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public NewTopic topic() {
return TopicBuilder
.name("t.food.order")
.partitions(1)
.replicas(1)
.build();
}
}
Класс модели FoodOrder
:
@Data
@Value
public class FoodOrder {
String item;
Double amount;
}
FoodOrderController
отвечает за получение заказа FoodOrder
и передачу его на уровень сервиса.
@Slf4j
@RestController
@RequestMapping("/order")
public class FoodOrderController {
private final FoodOrderService foodOrderService;
@Autowired
public FoodOrderController(FoodOrderService foodOrderService) {
this.foodOrderService = foodOrderService;
}
@PostMapping
public String createFoodOrder(@RequestBody FoodOrder foodOrder) throws JsonProcessingException {
log.info("create food order request received");
return foodOrderService.createFoodOrder(foodOrder);
}
}
FoodOrderService
— получение заказа FoodOrder
и передачу его Producer.
@Slf4j
@Service
public class FoodOrderService {
private final Producer producer;
@Autowired
public FoodOrderService(Producer producer) {
this.producer = producer;
}
public String createFoodOrder(FoodOrder foodOrder) throws JsonProcessingException {
return producer.sendMessage(foodOrder);
}
}
Producer
получает заказ FoodOrder
и публикует его в Kafka в виде сообщения.
В строке 18 мы конвертируем объект FoodOrder
в JSON-строку для его передачи в виде строки в Consumer-микросервис.
В строке 19 фактически отправляем сообщение, передавая топик для публикации (переменная окружения в строке 6) и заказ в виде сообщения.
@Slf4j
@Component
public class Producer {
@Value("${topic.name}")
private String orderTopic;
private final ObjectMapper objectMapper;
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public Producer(KafkaTemplate<String, String> kafkaTemplate, ObjectMapper objectMapper) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
}
public String sendMessage(FoodOrder foodOrder) throws JsonProcessingException {
String orderAsMessage = objectMapper.writeValueAsString(foodOrder);
kafkaTemplate.send(orderTopic, orderAsMessage);
log.info("food order produced {}", orderAsMessage);
return "message sent";
}
}
При запуске приложения мы должны увидеть топик, созданный в Kafdrop. А при отправке заказа FoodOrder — информацию в логе, что сообщение отправлено.
Теперь в Kafdrop в разделе Topics
можем посмотреть созданный топик t.food.order и увидеть наше сообщение.
8. Consumer-микросервис
Архитектура:
Этапы создания Consumer-микросервиса:
конфигурируем group-id и бины;
настраиваем доступ к базе данных;
создаем Consumer и FoodOrderService;
создаем репозиторий FoodOrderRepository.
Начнем с настройки порта для запуска нашего API, топика, который будем слушать, group-id для Consumer-микросервиса и конфигурации базы данных.
server:
port: 8081
topic:
name: t.food.order
spring:
kafka:
consumer:
group-id: "default"
h2:
console:
enabled: true
path: /h2-console
datasource:
url: jdbc:h2:mem:testdb
username: sa
password: password
Config
отвечает за настройку бина ModelMapper
— библиотеки для маппинга одних объектов на другие. Например, для DTO, используемого далее.
@Configuration
public class Config {
@Bean
public ModelMapper modelMapper() {
return new ModelMapper();
}
}
Классы модели:
@Data
@Value
public class FoodOrderDto {
String item;
Double amount;
}
@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
public class FoodOrder {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String item;
private Double amount;
}
Consumer
отвечает за прослушивание топика с заказами и получение сообщений. Полученные сообщения мы преобразуем в FoodOrderDto
, не содержащего ничего, связанного с персистентностью, например, ID.
@Slf4j
@Component
public class Consumer {
private static final String orderTopic = "${topic.name}";
private final ObjectMapper objectMapper;
private final FoodOrderService foodOrderService;
@Autowired
public Consumer(ObjectMapper objectMapper, FoodOrderService foodOrderService) {
this.objectMapper = objectMapper;
this.foodOrderService = foodOrderService;
}
@KafkaListener(topics = orderTopic)
public void consumeMessage(String message) throws JsonProcessingException {
log.info("message consumed {}", message);
FoodOrderDto foodOrderDto = objectMapper.readValue(message, FoodOrderDto.class);
foodOrderService.persistFoodOrder(foodOrderDto);
}
}
FoodOrderService
— преобразование полученного DTO в объект FoodOrder
и сохранение его в БД.
@Slf4j
@Service
public class FoodOrderService {
private final FoodOrderRepository foodOrderRepository;
private final ModelMapper modelMapper;
@Autowired
public FoodOrderService(FoodOrderRepository foodOrderRepository, ModelMapper modelMapper) {
this.foodOrderRepository = foodOrderRepository;
this.modelMapper = modelMapper;
}
public void persistFoodOrder(FoodOrderDto foodOrderDto) {
FoodOrder foodOrder = modelMapper.map(foodOrderDto, FoodOrder.class);
FoodOrder persistedFoodOrder = foodOrderRepository.save(foodOrder);
log.info("food order persisted {}", persistedFoodOrder);
}
}
Код FoodOrderRepository
:
@Repository
public interface FoodOrderRepository extends JpaRepository<FoodOrder, Long> {
}
Теперь при запуске Consumer-микросервиса отправленные ранее сообщения будут прочитаны из соответствующего топика.
Здесь отмечу одну важную деталь: если мы перейдем в Kafdrop и проверим сообщение, которое только что получили, оно будет доступно. Но, например, в RabbitMQ мы бы его не увидели.
9. Дополнительный функционал
Мы можем отправлять периодические сообщения, включив функционал запуска задач по расписанию.
Для этого добавляем аннотацию @EnableScheduling
к классу конфигурации Producer-микросервиса.
@EnableScheduling
@Configuration
public class Config {
...
}
Будем отправлять сообщения с фиксированным интервалом в 1000 миллисекунд.
@Slf4j
@Component
public class Scheduler {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private Integer count = 0;
@Scheduled(fixedRate = 1000)
public void sendMessage() {
count++;
kafkaTemplate.send("t.scheduled", "message " + count);
log.info("sent message count {}", count);
}
}
Топик будет создан автоматически, но можно определить бин также, как делали раньше.
Получим следующий результат:
10. Заключение
Основная идея статьи была познакомить вас с использованием Kafka совместно с Java и Spring для реализации на ее основе более сложных решений.
Исходный код из статьи доступен на GitHub здесь.
Ссылки
Kafka The Definitive Guide, O’Reilly
Apache Kafka, Matthias J. Sax
Приглашаем всех желающих на открытое занятие «Разработка консольных приложений на Spring и Picocli». На данном занятии мы покажем, как строить Command Line Interface и утилиты командной строки на Picocli, как альтернативу Spring Shell. Также будут рассмотрены некоторые возможности Java для создания таких консольных утилит. Регистрация — по ссылке.