Как стать автором
Обновить

Комментарии 8

А не рассматривали вынести логику из консюмера и продюсера в сервисы не завязанные на Kafka в принципе?
Чтобы тестировать логику в отрыве от транспорта:

  • Метод в консюмере просто транслирует полученный объект в сервис - однострочник консюмера можно не тестировать вовсе, а тестируемый сервис не зависит от кафки и на вход просто принимает объекты

  • От множества продюсеров можно отказаться вовсе, заменив их универсальным, который на вход принимает отправляемый объект и возможно имя очереди, а там где раньше шла прямая отправка инжектить такой сервис , который в тестах будет мокаться.

В результате тестируется именно бизнес-логика, а не завязки на транспорт, поднимать ничего не нужно, выключать ничего не нужно.
Сплошные плюсы.

Пример кода

То есть вместо ```java @Slf4j @Service @RequiredArgsConstructor public class UserConsumer {

private final UserService userService;
private final NotUserService notUserService;
private final UserHandler userHandler;
private final UserMapper userMapper;
private final List<String> userCodes;

@KafkaListener(groupId = "userConsumerGroupId",
        clientIdPrefix = "UserConsumer",
        topics = {"user-topic"},
        containerFactory = "kafkaListenerContainerFactory")
public void consume(UserInfoRecord userInfoRecord) {
    log.info("Тут разнообразная логика обработки");
    log.info("Cохранение в бд?");
    log.info("Работа с внешними сервисами?");
    log.info("Вызов мапперов?");
    log.info("Отправка информации в Kafka?");
}

}

@Slf4j
@Service
@RequiredArgsConstructor
public class UserProducer {

private final UserService userService;
/** Если у нас какой-то персистинг сообщений перед отправкой или обработка ошибок отправки */
private final KafkaMessageService kafkaMessageService;
/** Если мы просто отправляем - используем KafkaProducer, например, у нас по бОльшей части используетс AVRO */
private final KafkaProducer<String, SpecificRecordBase> kafkaProducer;
/** Но есть места, где используется JSON вместо AVRO */
private final KafkaProducer<String, String> kafkaJsonProducer;
private final ApplicationProperties applicationProperties;

public void sendUserInfoMessage(User user, Set<Integer> managerIds) {
  log.info("Тут логика отправки");
  log.info("Обычно сначала идет сборка модели для отправки.")
  log.info("Но может быть и какая-то более сложная логика.");
  var record = UserInfoRecord.newBuilder().build();
  log.info("Тут может быть как непосредственно отправка в Kafka");
  log.info("Так и реализация outbox-паттерна.");
  kafkaMessageService.persistMessageToSend(applicationProperties.userTopic(), record);
}

}

сделать так:
```java
@Slf4j
@Service
@RequiredArgsConstructor
public class UserConsumer {

    private final UserConsumerService service;

    @KafkaListener(groupId = "userConsumerGroupId",
            clientIdPrefix = "UserConsumer",
            topics = {"user-topic"},
            containerFactory = "kafkaListenerContainerFactory")
    public void consume(UserInfoRecord userInfoRecord) {
	    service.consume(userInfoRecord);
    }
}

@Slf4j
@Service
@RequiredArgsConstructor
public class UserConsumerService {
    private final UserService userService;
    private final NotUserService notUserService;
    private final UserHandler userHandler;
    private final UserMapper userMapper;
    private final List<String> userCodes;

    public void consume(UserInfoRecord userInfoRecord) {
        log.info("Тут разнообразная логика обработки");
        log.info("Cохранение в бд?");
        log.info("Работа с внешними сервисами?");
        log.info("Вызов мапперов?");
        log.info("Отправка информации в Kafka?");
    }
}

@Slf4j
@Service
@RequiredArgsConstructor
public class UserProducerService {

    private final KafkaSender sender;

    public void sendUserInfoMessage(User user, Set<Integer> managerIds) {
      log.info("Обычно сначала идет сборка модели для отправки.")
      log.info("Но может быть и какая-то более сложная логика.");
      var record = UserInfoRecord.newBuilder().build();
      sender.sendUserInfoMessage(record);
    }
}

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaSender {

    private final UserService userService;
    /** Если у нас какой-то персистинг сообщений перед отправкой или обработка ошибок отправки */
    private final KafkaMessageService kafkaMessageService;
    /** Если мы просто отправляем - используем KafkaProducer, например, у нас по бОльшей части используетс AVRO */
    private final KafkaProducer<String, SpecificRecordBase> kafkaProducer;
    /** Но есть места, где используется JSON вместо AVRO */
    private final KafkaProducer<String, String> kafkaJsonProducer;
    private final ApplicationProperties applicationProperties;

    public void sendUserInfoMessage(UserInfoRecord record) {
      log.info("Тут логика отправки");
      log.info("Тут может быть как непосредственно отправка в Kafka");
      log.info("Так и реализация outbox-паттерна.");
      kafkaMessageService.persistMessageToSend(applicationProperties.userTopic(), record);
    }
}

Я вам больше скажу - то что вы предлагаете, называется разделение кода на чистые функции и IO. Что широко практикуется в функциональном программировании.

И оно имеет в том числе те очевидные плюсы, что вы только что описали - некий метод (функция) создания объекта, или его обработки, не зависит ни от какой кафки и ни от какой базы - и поэтому нормально тестируется. А те методы (функции), которые зависят от кафки или базы - не содержат никакой логики.

Я бы не сказал что это из функционального программирования. То о чем пишет valery1707 это банальное соблюдение SOLID, разделение на слои (Транспортный, бизнес сервис, хранение данных и т.д.). Отделение бизнес логики от различных технических штук. В первоначальном варианте консюмер сочетает две роли 1) Транспортную - точка входа, где мы получаем объект из кафки. 2) декоратор - содержащий цепочку вызову всего бизнес процесса. Нарушен принцип единственной ответственности.

В ФП выделяется побочный эффект в самом общем виде, и к нему относятся и кафка, и база, и любое другое взаимодействие. Но если вам привычнее или удобнее формулировки типа SRP - ну можно и так назвать. Как по мне - это одни и те же идеи, по большому счету. Какие-то более абстрактные, какие-то для более частного случая.

Вы знаете. То, что вы пишите - это принципы SOLID. Первая буква. Их даже рекомендуется применять.

А ТС хочет в костыли и велосипеды.?

Не претендую на последнюю инстанцию, выскажу свое мнение - вы смешиваете транспортный уровень и бизнес логику. И от этого ваши мучения. Консюмер транспортный уровень его задача - получить дто в нужном формате, конвертнуть в модель и скормить сервису (бизнес сервису или круд-сервису - если нам нужно просто сохранить). И уже внутри этого сервиса если нужно вы вызываете публикацию в другой какой то топик. Профит - консюмер становится простой до безобразия. Некоторые горячие головы даже не тестят а другие накидывают очень простой тест на моках (который в состоянии написать любой джун - т.к. по сути просто проверить вызовы через verify() ). Логика у вас изолирована от транспорта, и, если в какой то момент у вас появляется взаимодействие по ресту, например, вы легко и не принужденно делаете контроллер и вызываете всю ту же цепочку от конвертера из дто в модель до публикации в новый топик.

как выше уже заметили, вам просто нужно отделить кафку от бизнес логики, и тогда вы обойдетесь тривиальными настройками. Более того, эта настройка становится единообразной для любых зависимостей, будь то рест, БД, кафка или что-то еще.

Обычно предлог для использования тестового контейнера или иного сложного мока состоит в том, что он содержит какую-то логику, которую хотелось бы сохранить и поменять которую нет возможности. Ну, типа, послал сообщение - в ответ приехало сообщение и т.п. Но даже в этом случае легче написать простой фейковый кафка провайдер, который будет отражать логику, которую вы ожидаете от реальной зависимости.

Зарегистрируйтесь на Хабре, чтобы оставить комментарий