Комментарии 8
А не рассматривали вынести логику из консюмера и продюсера в сервисы не завязанные на Kafka в принципе?
Чтобы тестировать логику в отрыве от транспорта:
Метод в консюмере просто транслирует полученный объект в сервис - однострочник консюмера можно не тестировать вовсе, а тестируемый сервис не зависит от кафки и на вход просто принимает объекты
От множества продюсеров можно отказаться вовсе, заменив их универсальным, который на вход принимает отправляемый объект и возможно имя очереди, а там где раньше шла прямая отправка инжектить такой сервис , который в тестах будет мокаться.
В результате тестируется именно бизнес-логика, а не завязки на транспорт, поднимать ничего не нужно, выключать ничего не нужно.
Сплошные плюсы.
Пример кода
То есть вместо ```java @Slf4j @Service @RequiredArgsConstructor public class UserConsumer {
private final UserService userService;
private final NotUserService notUserService;
private final UserHandler userHandler;
private final UserMapper userMapper;
private final List<String> userCodes;
@KafkaListener(groupId = "userConsumerGroupId",
clientIdPrefix = "UserConsumer",
topics = {"user-topic"},
containerFactory = "kafkaListenerContainerFactory")
public void consume(UserInfoRecord userInfoRecord) {
log.info("Тут разнообразная логика обработки");
log.info("Cохранение в бд?");
log.info("Работа с внешними сервисами?");
log.info("Вызов мапперов?");
log.info("Отправка информации в Kafka?");
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class UserProducer {
private final UserService userService;
/** Если у нас какой-то персистинг сообщений перед отправкой или обработка ошибок отправки */
private final KafkaMessageService kafkaMessageService;
/** Если мы просто отправляем - используем KafkaProducer, например, у нас по бОльшей части используетс AVRO */
private final KafkaProducer<String, SpecificRecordBase> kafkaProducer;
/** Но есть места, где используется JSON вместо AVRO */
private final KafkaProducer<String, String> kafkaJsonProducer;
private final ApplicationProperties applicationProperties;
public void sendUserInfoMessage(User user, Set<Integer> managerIds) {
log.info("Тут логика отправки");
log.info("Обычно сначала идет сборка модели для отправки.")
log.info("Но может быть и какая-то более сложная логика.");
var record = UserInfoRecord.newBuilder().build();
log.info("Тут может быть как непосредственно отправка в Kafka");
log.info("Так и реализация outbox-паттерна.");
kafkaMessageService.persistMessageToSend(applicationProperties.userTopic(), record);
}
}
сделать так:
```java
@Slf4j
@Service
@RequiredArgsConstructor
public class UserConsumer {
private final UserConsumerService service;
@KafkaListener(groupId = "userConsumerGroupId",
clientIdPrefix = "UserConsumer",
topics = {"user-topic"},
containerFactory = "kafkaListenerContainerFactory")
public void consume(UserInfoRecord userInfoRecord) {
service.consume(userInfoRecord);
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class UserConsumerService {
private final UserService userService;
private final NotUserService notUserService;
private final UserHandler userHandler;
private final UserMapper userMapper;
private final List<String> userCodes;
public void consume(UserInfoRecord userInfoRecord) {
log.info("Тут разнообразная логика обработки");
log.info("Cохранение в бд?");
log.info("Работа с внешними сервисами?");
log.info("Вызов мапперов?");
log.info("Отправка информации в Kafka?");
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class UserProducerService {
private final KafkaSender sender;
public void sendUserInfoMessage(User user, Set<Integer> managerIds) {
log.info("Обычно сначала идет сборка модели для отправки.")
log.info("Но может быть и какая-то более сложная логика.");
var record = UserInfoRecord.newBuilder().build();
sender.sendUserInfoMessage(record);
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaSender {
private final UserService userService;
/** Если у нас какой-то персистинг сообщений перед отправкой или обработка ошибок отправки */
private final KafkaMessageService kafkaMessageService;
/** Если мы просто отправляем - используем KafkaProducer, например, у нас по бОльшей части используетс AVRO */
private final KafkaProducer<String, SpecificRecordBase> kafkaProducer;
/** Но есть места, где используется JSON вместо AVRO */
private final KafkaProducer<String, String> kafkaJsonProducer;
private final ApplicationProperties applicationProperties;
public void sendUserInfoMessage(UserInfoRecord record) {
log.info("Тут логика отправки");
log.info("Тут может быть как непосредственно отправка в Kafka");
log.info("Так и реализация outbox-паттерна.");
kafkaMessageService.persistMessageToSend(applicationProperties.userTopic(), record);
}
}
удалён
Я вам больше скажу - то что вы предлагаете, называется разделение кода на чистые функции и IO. Что широко практикуется в функциональном программировании.
И оно имеет в том числе те очевидные плюсы, что вы только что описали - некий метод (функция) создания объекта, или его обработки, не зависит ни от какой кафки и ни от какой базы - и поэтому нормально тестируется. А те методы (функции), которые зависят от кафки или базы - не содержат никакой логики.
Я бы не сказал что это из функционального программирования. То о чем пишет valery1707 это банальное соблюдение SOLID, разделение на слои (Транспортный, бизнес сервис, хранение данных и т.д.). Отделение бизнес логики от различных технических штук. В первоначальном варианте консюмер сочетает две роли 1) Транспортную - точка входа, где мы получаем объект из кафки. 2) декоратор - содержащий цепочку вызову всего бизнес процесса. Нарушен принцип единственной ответственности.
В ФП выделяется побочный эффект в самом общем виде, и к нему относятся и кафка, и база, и любое другое взаимодействие. Но если вам привычнее или удобнее формулировки типа SRP - ну можно и так назвать. Как по мне - это одни и те же идеи, по большому счету. Какие-то более абстрактные, какие-то для более частного случая.
Вы знаете. То, что вы пишите - это принципы SOLID. Первая буква. Их даже рекомендуется применять.
А ТС хочет в костыли и велосипеды.?
Не претендую на последнюю инстанцию, выскажу свое мнение - вы смешиваете транспортный уровень и бизнес логику. И от этого ваши мучения. Консюмер транспортный уровень его задача - получить дто в нужном формате, конвертнуть в модель и скормить сервису (бизнес сервису или круд-сервису - если нам нужно просто сохранить). И уже внутри этого сервиса если нужно вы вызываете публикацию в другой какой то топик. Профит - консюмер становится простой до безобразия. Некоторые горячие головы даже не тестят а другие накидывают очень простой тест на моках (который в состоянии написать любой джун - т.к. по сути просто проверить вызовы через verify() ). Логика у вас изолирована от транспорта, и, если в какой то момент у вас появляется взаимодействие по ресту, например, вы легко и не принужденно делаете контроллер и вызываете всю ту же цепочку от конвертера из дто в модель до публикации в новый топик.
как выше уже заметили, вам просто нужно отделить кафку от бизнес логики, и тогда вы обойдетесь тривиальными настройками. Более того, эта настройка становится единообразной для любых зависимостей, будь то рест, БД, кафка или что-то еще.
Обычно предлог для использования тестового контейнера или иного сложного мока состоит в том, что он содержит какую-то логику, которую хотелось бы сохранить и поменять которую нет возможности. Ну, типа, послал сообщение - в ответ приехало сообщение и т.п. Но даже в этом случае легче написать простой фейковый кафка провайдер, который будет отражать логику, которую вы ожидаете от реальной зависимости.
Как протестировать логику консьюмеров и продюсеров и не сгореть? Spring Boot 3, Spring Kafka