Всем привет.
В данной статье будет описано, как создать простой kafka producer и kafka consumer, а затем протестировать их.
Данная статья будет полезна начинающим разработчикам, которые еще не работали с технологией Apache Kafka.
Немного теории.
Вначале надо разобраться, что такое Apache Kafka и для чего она используется. И тут сразу могут возникнуть первые вопросы, так как первое, что приходит в голову, если идет речь о kafka, то это - распределенная система обмена сообщениями между серверными приложениями в режиме реального времени. Но если "копнуть глубже" и посмотреть на определение kafka на официальном сайте https://kafka.apache.org/ мы увидим.
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Исходя из этого определения Apache Kafka — это больше, чем просто система обмена сообщениями, это распределенная платформа потоковой передачи событий, а также потоковой аналитики и интеграции данных.
То есть kafka может использоваться и как база данных, и как распределенное хранилище логов, и как очередь, и как платформа для потоковой обработки данных и т.д.
В данной статье будет рассмотрен пример, как с помощью kafka организовать обмен сообщениями между двумя микросервисами.
Kafka, как и почти все сервисы обработки очередей, условно состоит из трех основных частей:
1) сервер или еще его называют брокер;
2) producer - отправляют сообщения брокеру;
3) consumer - считывают сообщения с брокера, использую модель pull, то есть консьюмеры сами отправляют запросы к брокеру для получения новых сообщений.
Главной отличительной чертой kafka от других систем обработки очередей (например RabbitMQ), является то, что сообщения в kafka могут храниться на брокере днями, неделями или даже годами. Благодаря этому одно и тоже сообщение может быть обработано разными консьюмерами по-разному.
Рассмотрим какая структура сообщения в kafka. Оно состоит из ключа (key), значения (value), таймстампа (timestamp) и набора метаданных (headers).
Сообщения хранятся в топиках (topics). Топики состоят из партиций (partitions). Партиции или их еще называют разделы - это копии очередей наших сообщений. Чтобы повысить надежность и доступность данных в кластере-Kafka, разделы могут иметь копии, число которых задается коэффициентом репликации (replication factor), который показывает, на сколько брокеров-последователей (follower) будут скопированы данные с ведущего-лидера (leader). Таким образом, гарантируется наличие нескольких копий сообщения на разных брокерах. Партиции, в свою очередь, распределены между брокерами внутри одного кластера. Такая сложная, на первый взгляд, система хранения сообщений необходима для отказоустойчивости, масштабирования и повышения производительности работы, так как она позволяет продюсерам писать в несколько брокеров одновременно, а консьюмерам - читать, также из нескольких брокеров.

У каждой партиции есть свой "лидер" (leader) - это тот брокер, который работает с продюсером и на него приходит сообщение, а также у каждой партиции имеются несколько "фолловеров" (followers) - это брокеры, которые хранят копии партиций. Перед отправкой сообщения консьюмер обращается к брокеру и запрашивает данные, кто является лидером партиции.
Таким образом, общая схема сохранения сообщения в kafka выглядит следующим образом. Имеется какой-то топик, в который записываются сообщения, и есть несколько партиций (копий очередей наших сообщений), распределенных по брокерам в кластере. Продюсер вначале обращается к брокеру с вопросом, кто является лидером партиции в данном брокере, и после получения данной информации отправляет туда свое сообщение, на втором этапе, фолловеры данной партиции копируют себе отправленное сообщение на свой брокер. Так происходит с каждой партицией.
Время хранения сообщения в kafka регулируется с помощью специальных настроек.
Рассмотрим сейчас как выглядит работа консьюмера в kafka.
Каждый консьюмер должен быть частью какой-нибудь консьюмер группы. Данная группа должна иметь уникальное название и должна быть зарегистрирована в кластере. Как правило, если у нас есть несколько консьюмеров, в одной группе, то они получают сообщения из разных партиций. Желательно, чтобы количество консьюмеров было равно количеству партиций, и каждый консьюмер читал сообщения из своей партиции, таким образом, распределяется нагрузка и повышается производительность работы.
Есть еще один важный вопрос. Если мы захотим добавить консьюмера к топику не сразу, а позже или, например, произойдет сбой консьюмера, а позже он восстановится и вопрос, откуда он будет знать с какого сообщения продолжить работу? Для этого имеется специальный механизм консьюмер-офсетов (offset). Перед началом работы консьюмер делает специальный запрос к брокеру с указанием группы, топика, партиции и офсета, который должен быть помечен как обработанный. Брокер сохраняет эту информацию у себя. При сбое в работе, консьюмер запрашивает у брокера последний закомиченный офсет и продолжает читать с данной позиции сообщения.
Это упрощенное описание работы kafka-продюсера и kafka-консьюмера.
Также при описании kafka нельзя не вспомнить про один важный компонент - zookeeper.
ZooKeeper - это хранилище метаданных kafka, именно он знает в каком состоянии находятся брокеры, какая партиция играет роль лидера, сколько партиций и где они находятся, сколько у каждой партиции реплик и так далее.
Разобравшись немного с теорией приступим к нашему примеру.
Весь код примера будет доступен по ссылке.
Пример будет очень простой. Допустим у нас будет три микросервиса. Один - это продюсер - он будет производить и отправлять сообщения в kafka, в нашем случае это будет Заказ.
@Data @AllArgsConstructor @NoArgsConstructor public class Order { private String productName; private String barCode; private int quantity; private BigDecimal price; }
Второй микросервис - консьюмер, который будет читать наше сообщение и записывать его в базу данных.
И третий микросервис - также будет читать наше сообщение и просто выводить его в консоль.
Таким образом, я хочу показать, что можно настроить несколько консьюмеров, которые будут подписаны на один топик и будут получать из него сообщения, но поступать с ними по-разному.
Весь код приводить не буду, буду останавливаться только на главных моментах.
Kafka, zookeeper, kafka-ui (для просмотра сообщений в kafka), database (postgres) и pgadmin (для просмотра данных в базе) поднимем с помощью docker.
Для этого напишем следующий docker-compose.yml файл.
services: zookeeper: image: confluentinc/cp-zookeeper:6.2.4 healthcheck: test: [ "CMD", "nc", "-vz", "localhost", "2181" ] interval: 10s timeout: 3s retries: 3 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 kafka: image: confluentinc/cp-kafka:6.2.4 depends_on: zookeeper: condition: service_healthy ports: - 29092:29092 healthcheck: test: [ "CMD", "nc", "-vz", "localhost", "9092" ] interval: 10s timeout: 3s retries: 3 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: OUTSIDE://:29092,INTERNAL://:9092 KAFKA_ADVERTISED_LISTENERS: OUTSIDE://localhost:29092,INTERNAL://kafka:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui ports: - "8080:8080" restart: always depends_on: kafka: condition: service_healthy environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 service-db: image: postgres:14.7-alpine environment: POSTGRES_USER: username POSTGRES_PASSWORD: password healthcheck: test: ["CMD-SHELL", "pg_isready", "-d", "clients_database"] interval: 10s timeout: 3s retries: 3 ports: - "15432:5432" volumes: - ./infrastructure/db/create_db.sql:/docker-entrypoint-initdb.d/create_db.sql restart: unless-stopped pgadmin: container_name: pgadmin4_container image: dpage/pgadmin4:7 restart: always environment: PGADMIN_DEFAULT_EMAIL: admin@admin.com PGADMIN_DEFAULT_PASSWORD: root ports: - "5050:80" kafka-topics-generator: image: confluentinc/cp-kafka:6.2.4 depends_on: kafka: condition: service_healthy entrypoint: [ '/bin/sh', '-c' ] command: | " # blocks until kafka is reachable kafka-topics --bootstrap-server kafka:9092 --list echo -e 'Creating kafka topics' kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic send-order-event --replication-factor 1 --partitions 2 echo -e 'Successfully created the following topics:' kafka-topics --bootstrap-server kafka:9092 --list "
Базу данных orders_database, создадим на этапе поднятия контейнера с postgres.
Топик (send-order-event) создадим с помощью команды в отдельном контейнере, здесь же создадим две партиции, так как у нас будет два консьюмера и желательно, чтобы каждый консьюмер читал из своей патриции.
Топики можно также создавать и с помощью кода.
Пройдемся по этапам создания продюсера.
Вначале необходимо сделать некоторые настройки продюсера. Это можно делать с помощью кода или прописывать в application файле. Мы это сделаем с помощью application.yml файла.
server: port: 8081 spring: kafka: bootstrap-servers: localhost:29092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: acks: 1 spring: json: add: type: headers: false topic: send-order: send-order-event
Здесь указываем порт, на котором будет работать kafka (должен совпадать с внешним портом, который мы открыли в docker для kafka), также необходимо указать как мы будем сериализовать ключ и значение (значение - это и будет наш заказ, поэтому здесь надо указать JsonSerializer). Также прописываем название нашего топика send-order-event, название должно совпадать с тем, что мы указали при создании топика в docker. Данное название мы потом с помощью аннотации @Value будем сетать в переменную.
Далее создадим сам сервис по отправке сообщений.
@Service @RequiredArgsConstructor public class KafkaMessagingService { @Value("${topic.send-order}") private String sendClientTopic; private final KafkaTemplate<String , Object> kafkaTemplate; public void sendOrder(OrderSendEvent orderSendEvent) { kafkaTemplate.send(sendClientTopic, orderSendEvent.getBarCode(), orderSendEvent); } }
Внедряем бин private final KafkaTemplate<String , Object> kafkaTemplate в данный класс с помощью аннотации @RequiredArgsConstructor. Также как было сказано раньше сетаем в переменную sendClientTopic название нашего топика с application.yml файла. Далее пишем сам метод по отправке сообщения, который на вход будет принимать OrderSendEvent - то есть наш заказ. Вызываем у kafkaTemplate метод send куда передаем название топика, ключ (в качестве ключа будет выступать код продукта). Ключ нужен для того чтобы сообщения с одинаковыми ключами всегда записываются в одну и ту же партицию. Последним передаем сам заказ.
@Data @AllArgsConstructor @NoArgsConstructor public class OrderSendEvent { private String productName; private String barCode; private int quantity; private BigDecimal price; }
Создадим еще класс Producer.
@Slf4j @Component @RequiredArgsConstructor public class Producer { private final KafkaMessagingService kafkaMessagingService; private final ModelMapper modelMapper; public Order sendOrderEvent(Order order) { kafkaMessagingService.sendOrder(modelMapper.map(order, OrderSendEvent.class)); log.info("Send order from producer {}", order); return order; } }
Он нужен просто для того чтобы отделить логику отправки от маппинга сущностей.
Отправку сообщения будем производить с помощью postman, поэтому создадим еще контроллер OrderController.
@Slf4j @Validated @RestController @RequiredArgsConstructor @RequestMapping("/api/v1/orders") public class OrderController { private final Producer producer; @PostMapping @ResponseStatus(HttpStatus.OK) public Order sendOrder(@RequestBody Order order) { log.info("Send order to kafka"); producer.sendOrderEvent(order); return order; } }
Рассмотрим теперь первый консьюмер.
Вначале также создадим application.yml файл, в котором настроим наш консьюмер.
server: port: 8082 spring: kafka: bootstrap-servers: localhost:29092 consumer: group-id: "order-1" auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring: json: trusted: packages: '*' datasource: url: jdbc:postgresql://${DB_HOST:localhost}:${DB_PORT:15432}/orders_database username: username password: password liquibase: enabled: true drop-first: false change-log: classpath:db/changelog/db.changelog-master.xml default-schema: public jpa: show-sql: false open-in-view: false hibernate: ddl-auto: none properties: hibernate: dialect: org.hibernate.dialect.PostgreSQLDialect topic: send-order: send-order-event
Здесь как и в продюсере указываем порт, на котором работает kafka.
Прописываем group-id: "order-1" - так как консьюмеры должны быть объединены в группы.
Указываем настройку auto-offset-reset: earliest - она нужна для того, если мы добавим новую партицию, когда в топик пишут сообщения продюсеры, без данной настройки, мы можем потерять или не обработать кусок данных, записавшихся в новую партицию до того, как консьюмеры обновили метаданные по топику и начали читать данные из этой партиции.
Как и в продюсере указываем как мы будем уже только десериализовать наши ключ и значение. Также прописываем настройку для того чтобы JsonDeserializer доверял десериализовать только классы в доверенном пакете. То есть тут можно указать конкретный пакет или с помощью "*" - указать, что нужно доверять всем классам во всех пакетах.
Также прописываем название нашего топика send-order-event.
В данном файле также прописываем настройки по подключению к базе данных, накатыванию таблиц с помощью liquibase и чтобы выводились sql запросы к базе данных.
Далее создадим класс OrderEvent. По структуре он должен совпадать с тем классом (OrderSendEvent), который мы отправляем через продюсер.
@Data @AllArgsConstructor @NoArgsConstructor public class OrderEvent { private String productName; private String barCode; private int quantity; private BigDecimal price; }
И сам сервис по приемке сообщения.
@Slf4j @Service @AllArgsConstructor public class KafkaMessagingService { private static final String topicCreateOrder = "${topic.send-order}"; private static final String kafkaConsumerGroupId = "${spring.kafka.consumer.group-id}"; private final OrderService orderService; private final ModelMapper modelMapper; @Transactional @KafkaListener(topics = topicCreateOrder, groupId = kafkaConsumerGroupId, properties = {"spring.json.value.default.type=com.example.consumer.service.messaging.event.OrderEvent"}) public OrderEvent createOrder(OrderEvent orderEvent) { log.info("Message consumed {}", orderEvent); orderService.save(modelMapper.map(orderEvent, OrderDto.class)); return orderEvent; } }
Здесь сетаем переменным topicCreateOrder и kafkaConsumerGroupId с application.yml файла значения названия топика и группы.
Создаем сам метод по обработке сообщений. Вешаем на него аннотацию @KafkaListener куда передаем название топика, который надо слушать, название группы, а также передаем еще настройку по дефолтному типу данных, который мы принимаем. Данную настройку, можно прописать и в application.yml файле, но я хотел показать как можно передавать настройки каждому слушателю, или, например, у вас в группе есть слушатель, который принимает другую сущность.
Далее с полученным сообщением, то есть OrderEvent, можно выполнять различную логику, зависящую от бизнес-требований. В нашем случае мы будем сохранять наш заказ в базу данных.
Рассмотрим еще один консьюмер, он создан в другом микросервисе и его настройки идентичны первому, поэтому только покажу сам метод по приемке сообщений - он будет выводить наш заказ в консоль. Здесь я хочу показать, что на один топик могут быть подписаны несколько консьюмеров и по разному трактовать, что делать с тем сообщением, которое будет появляться в топике.
@Slf4j @Service @AllArgsConstructor public class KafkaMessagingService { private static final String topicCreateOrder = "${topic.send-order}"; private static final String kafkaConsumerGroupId = "${spring.kafka.consumer.group-id}"; @Transactional @KafkaListener(topics = topicCreateOrder, groupId = kafkaConsumerGroupId, properties = {"spring.json.value.default.type=com.example.service.OrderEvent"}) public OrderEvent printOrder(OrderEvent orderEvent) { log.info("The product: {} was ordered in quantity: {} and at a price: {}", orderEvent.getProductName(), orderEvent.getQuantity(), orderEvent.getPrice()); log.info("To pay: {}", new BigDecimal(orderEvent.getQuantity()).multiply(orderEvent.getPrice())); return orderEvent; } }
Давайте сейчас посмотрим как все это работает.
Вначале запустим наш docker-compose.yml командой docker-compose up -d в консоли.

Далее необходимо подождать, пока docker стянет необходимые образы с docker hub и на их основе запустит контейнеры.
Идем в docker desktop и мы должны увидеть следующее.

Kafka, zookeeper, kafka-ui, postgres и pgadmin должны быть запущены и работать. Зайдем в kafka-topics-generator и убедимся, что топик создался.

Далее запускаем все наши три микросервиса.
Идем в postman и отправляем json с заказом на адрес http://localhost:8081/api/v1/orders, так как мы запустили наш продюсер на порту 8081.

В логах продюсера мы должны увидеть, что сообщение отправилось.

Теперь зайдем на http://localhost:8080/ здесь мы должны увидеть в Topics наш топик.

Также в Messages мы должны увидеть наше отправленное сообщение.


И в Consumers мы можем увидеть, что у нас есть два консьюмера.

Также проверим сохранился ли наш заказ, это должен был сделать наш первый консьюмер.

В логах мы видим, что сообщение обработано.
Идем на http://localhost:5050 заходим используя креды указанные в docker-compose.yml.
Далее настраиваем подключение.


Делаем select * from orders и должны увидеть сохраненный заказ.

Теперь еще проверим как сработал наш второй консьюмер. Смотрим логи и видим, что наш второй консьюмер также отработал и вывел в консоль наш заказ.

Еще посмотрим как можно протестировать продюсер и консьюмер.
Вначале обратимся к продюсеру. Его мы протестируем с помощью EmbeddedKafka, он будет работать быстрее, чем использовать KafkaContainer, но для тестов консьюмера мы попробуем использовать KafkaContainer.
@SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) public class KafkaMessageProducerServiceIT { public static final String TOPIC_NAME_SEND_CLIENT = "send-order-event"; @Autowired private KafkaMessagingService kafkaMessagingService; @Test public void it_should_send_order_event() { OrderSendEvent order = FakeOrder.getOrderSendEvent(); kafkaMessagingService.sendOrder(order); Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); properties.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-java-test"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(JsonDeserializer.VALUE_DEFAULT_TYPE, OrderSendEvent.class); KafkaConsumer<String, OrderSendEvent> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList(TOPIC_NAME_SEND_CLIENT)); ConsumerRecords<String, OrderSendEvent> records = consumer.poll(Duration.ofMillis(10000L)); consumer.close(); //then assertEquals(1, records.count()); assertEquals(order.getProductName(), records.iterator().next().value().getProductName()); assertEquals(order.getBarCode(), records.iterator().next().value().getBarCode()); assertEquals(order.getQuantity(), records.iterator().next().value().getQuantity()); assertEquals(order.getPrice(), records.iterator().next().value().getPrice()); } }
Суть данного теста проста, мы внедряем наш реальный сервис по отправке сообщений KafkaMessagingService и вызываем метод sendOrder(), куда передаем тестовое сообщение. После создаем консьюмера, подключаемся к нашему топику, читаем оттуда сообщение и проверяем совпадает ли оно с отправленным.

Как видим тест прошел успешно.
Протестируем наш консьюмер, который сохраняет заказ в базу данных.
@Testcontainers @SpringBootTest class KafkaMessagingServiceIT { public static final Long ORDER_ID = 1L; public static final String TOPIC_NAME_SEND_ORDER= "send-order-event"; @Container static PostgreSQLContainer<?> postgreSQLContainer = new PostgreSQLContainer<>("postgres:12") .withUsername("username") .withPassword("password") .withExposedPorts(5432) .withReuse(true); @Container static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.4")) .withEmbeddedZookeeper() .withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9093 ,BROKER://0.0.0.0:9092") .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT") .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER") .withEnv("KAFKA_BROKER_ID", "1") .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1") .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") .withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "") .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); static { Startables.deepStart(Stream.of(postgreSQLContainer, kafkaContainer)).join(); } @DynamicPropertySource static void overrideProperties(DynamicPropertyRegistry registry) { registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers); registry.add("spring.datasource.url", postgreSQLContainer::getJdbcUrl); registry.add("spring.datasource.username", postgreSQLContainer::getUsername); registry.add("spring.datasource.password", postgreSQLContainer::getPassword); registry.add("spring.datasource.driver-class-name", postgreSQLContainer::getDriverClassName); } @Autowired private OrdersRepository ordersRepository; @Test void save_order() throws InterruptedException { //given String bootstrapServers = kafkaContainer.getBootstrapServers(); OrderEvent orderEvent = FakeOrder.getOrderEvent(); Order order = FakeOrder.getOrder(); Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); ProducerFactory<String, OrderEvent> producerFactory = new DefaultKafkaProducerFactory<>(configProps); KafkaTemplate<String, OrderEvent> kafkaTemplate = new KafkaTemplate<>(producerFactory); //when SECONDS.sleep(5); kafkaTemplate.send(TOPIC_NAME_SEND_ORDER, orderEvent.getBarCode(), orderEvent); SECONDS.sleep(5); //then Order orderFromDB = ordersRepository.findById(ORDER_ID).get(); assertEquals(orderFromDB.getId(), ORDER_ID); assertEquals(orderFromDB.getProductName(), order.getProductName()); assertEquals(orderFromDB.getBarCode(), order.getBarCode()); assertEquals(orderFromDB.getQuantity(), order.getQuantity()); assertEquals(orderFromDB.getPrice(), order.getPrice().setScale(2, RoundingMode.HALF_DOWN)); assertEquals(orderFromDB.getAmount(), order.getAmount().setScale(2)); assertEquals(orderFromDB.getOrderDate().getYear(), order.getOrderDate().getYear()); assertEquals(orderFromDB.getStatus(), order.getStatus()); } }
Так как это интеграционный тест, то мы будем использовать KafkaContainer и PostgreSQLContainer, и проверим, что наше сообщение прочиталось и сохранилось в базу данных.
То есть вначале настраиваем контейнеры с kafka и postgreSQL.
Далее внедряем OrdersRepository, чтобы потом получить оттуда данные.
И сам тест тоже довольно прост. Вначале мы создаем продюсера и отправляем в наш топик сообщение с заказом. Далее с помощью ordersRepository обращаемся к базе данных, оттуда получаем наш сохраненный заказ, который должен был сам сохраниться и проверяем правильный ли он.
Данный тест будет выполняться довольно долго, так как надо еще поднять контейнеры с kafka и postgreSQL.


Как видим наш тест прошел успешно.
На этом все.
Спасибо. Всем кто дочитал до конца.
Всем пока.
