Pull to refresh
3529.73
RUVDS.com
VDS/VPS-хостинг. Скидка 15% по коду HABR15

Часть 2: Как я реализовал взаимодействие микросервисов — Kafka и gRpc

Level of difficultyMedium
Reading time21 min
Views10K

Привет! Меня зовут Бромбин Андрей, и сегодня я продолжаю цикл статей о создании микросервисного приложения с нуля. В этом выпуске мы сосредоточимся на взаимодействии между микросервисами, используя два подхода: асинхронный с помощью Kafka и синхронный через gRPC.

Независимо от вашего опыта, этот туториал предоставит вам готовые решения и ценные знания для создания Java Spring Microservices. Начинающие разработчики получат чёткое пошаговое руководство, а опытные специалисты — новые идеи, практические примеры и возможности для обмена опытом с коллегами. Разрабатывайте микросервисы эффективно и достигайте отличных результатов!

О чём эта статья?


В этой статье мы вместе реализуем микросервис для обработки изображений, обсудим архитектурные решения, необходимость выделения его в отдельный сервис, а также выберем подходящие технологии для синхронного и асинхронного взаимодействия. В конце — ссылки на репозитории и вспомогательные материалы.


«Как передавать сообщения гномам, живущим в глубинах Громовых Пещер?» — вопрос, стоящий перед каждым архитектором распределённых систем.

Мы рассмотрим:

  • Микросервис изображений: функциональность и архитектура.
  • Выбор и настройка объектного хранилища (MinIO).
  • Асинхронное взаимодействие с помощью Kafka: гарантии доставки, продюсер, консьюмер, топики, DLT.
  • Синхронное взаимодействие через gRpc: protobuf, генерация кода, типы RPC-вызовов и реализация клиента и сервера на примере сохранения изображения.

▍ Итоги предыдущей части


Часть 1: Как я создал идеальный REST API — микросервис инцидентов на Java и Spring

В предыдущей итерации цикла мы рассмотрели создание идеального REST API на примере микросервиса Инцидентов. Узнали об основных HTTP-методах, разобрались с пагинацией, улучшили читаемость, осознанно используя Lombok-аннотации, инкапсулировали данные с помощью DTO, научились правильно писать эндпоинты и их url'ы, использовали Глобальный обработчик ошибок для методов контроллера, а также при решении всех поставленных задач использовали уместные паттерны проектирования.

Микросервис изображений и предметная область


Бизнес требует возможность прикреплять изображения к инциденту, и перед нами встаёт вопрос, как лучше реализовать эту функциональность. Конечно, можно было бы просто расширить существующий сервис инцидентов, но это не всегда рационально.

Архитектура разрабатываемой системы

Во-первых, инциденты без изображений — довольно частый кейс, и усложнение основного сервиса приведёт к ненужным зависимостям и увеличению нагрузки. Во-вторых, хранение и обработка изображений — это отдельная задача, которая требует специализированных решений, таких как объектное хранилище.


Использование микросервисной архитектуры позволит нам отделить эту логику в независимый сервис, который можно развивать и масштабировать независимо от остальной системы.

Сущность Image



Класс Image представляет собой JPA-сущность, помеченную как @Entity, отвечающую за хранение информации об изображениях:id инцидента (auto-increment), url в хранилище, метаинформация: тип, размер, имя файла.


Рекомендуется явно использовать @Getter, @Setter, @NoArgsConstructor и @AllArgsConstructor, а не @Data, чтобы избежать бесконечной рекурсии в связях и ненужного кода.




Выбор хранилища изображений


Объектное хранилище — это система хранения данных, в которой файлы хранятся в виде объектов внутри контейнеров (bucket), без традиционной файловой иерархии. Каждый объект содержит данные, уникальный идентификатор и метаданные, что позволяет гибко управлять доступом и масштабируемостью. Такой подход абстрагирует хранение данных от файловой системы и обеспечивает гибкость в управлении.


Наиболее распространённый стандарт — S3-совместимые хранилища (Amazon S3, MinIO, Ceph и др.). Для локального развёртывания и тестирования удобнее всего использовать MinIO, так как он полностью совместим с S3 API и прост в настройке. Для больших проектов в продакшен среде используют облачные хранилища от AWS, Google Cloud, Azure и другие, так как поддерживают автоматическое масштабирование, репликацию, доступность и политику безопасности в соответствии со стандартами.


Для знакомства с S3-хранилищем используем MinIO. Это позволит разобраться с принципами работы объектного хранилища в локальной среде. Для этого необходимо:


Добавить зависимость в pom.xml


<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>${minio.version}</version>
</dependency>

Развернём MinIO в Docker. Напишем docker-compose.yml, где укажем:

  1. Образ: minio/minio:latest
  2. Имя контейнера: minio_container
  3. Credentials для root-пользователя: MINIO_ROOT_USER, MINIO_ROOT_PASSWORD
  4. Порты:
    • Основной порт для сервиса: 9000:9000
    • Порт веб-интерфейса: 9003:9003
  5. Команда для запуска (каталог хранения — /data, порт веб-консоли — :9003)
  6. Том: minio_data:/data монтируется в контейнер по пути /data
Файл конфигурации: docker-compose.yml
version: '3.8'
services:
  minio:
    image: minio/minio:latest
    container_name: minio_container
    environment:
      MINIO_ROOT_USER: <ваш_юзер>
      MINIO_ROOT_PASSWORD: <ваш_пароль>
    ports:
      - "9000:9000"
      - "9003:9003"
    command: server /data --console-address ":9003"
    volumes:
      - minio_data:/data

volumes:
  minio_data:


Автоматически подтягиваем переменные окружения из application.properties:

Класс MinioProperties
@Data
@Component
@ConfigurationProperties(prefix = "minio")
@FieldDefaults(level = AccessLevel.PRIVATE)
public class MinioProperties {
    String endpoint;
    String accessKey;
    String secretKey;
    String bucketName;
}

Теперь осталось написать MinIO Config для работы внутри нашего Spring приложения:

Класс MinioConfig
@Configuration
@RequiredArgsConstructor
@FieldDefaults(level= AccessLevel.PRIVATE, makeFinal = true)
public class MinioConfig {

    MinioProperties minioProperties;

    @Bean
    public MinioClient minioClient() {
        return MinioClient.builder()
                .endpoint(minioProperties.getEndpoint())
                .credentials(minioProperties.getAccessKey(), 
                             minioProperties.getSecretKey())
                .build();
    }

    @Bean
    public String bucketName() {
        return minioProperties.getBucketName();
    }
}

И всё что остаётся — прописать логику работы с бином minioClient и, непосредственно, сохранение файла в хранилище. Для этого сгенерировали уникальное имя файла:


String uniqueFileName = "image_" + imageToSave.getFileName() + UUID.randomUUID();

Напишем реализацию интерфейса — MinioServiceImpl. Метод uploadFile вызывает на MinioClient метод putObject, которому передаётся PutObjectArgs.builder() с параметрами:


  • bucket(bucketName): имя бакета, в который загружается файл (берётся из конфигурации);
  • object(fileName): имя объекта в хранилище;
  • stream(file.getInputStream(), file.getSize(), -1): поток файла, его размер и -1 как неизвестное значение части потока;
  • contentType(file.getContentType()): MIME-тип файла.

После успешной загрузки вызывается getPresignedObjectUrl, который создаёт подписанный URL для скачивания файла, передавая в GetPresignedObjectUrlArgs.builder() имя бакета и объекта, а также метод Method.GET. Для удаления аналогичным образом используется метод minioClient.removeObject().

Класс MinioServiceImpl
@Override   
public String uploadFile(MultipartFile file, String fileName) {
    try {
        minioClient.putObject(
                PutObjectArgs.builder()
                        .bucket(bucketName)
                        .object(fileName)
                        .stream(file.getInputStream(), file.getSize(), -1)
                        .contentType(file.getContentType())
                        .build()
        );
        log.info("File {} uploaded successfully to MinIO", fileName);
        return minioClient.getPresignedObjectUrl(
                GetPresignedObjectUrlArgs.builder()
                        .bucket(bucketName)
                        .object(fileName)
                        .method(Method.GET)
                        .build()
        );
    } catch (Exception e) {
        throw new MinioFileException("Failed to upload file " + fileName, e);
    }
}

@Override
public void deleteFile(String fileName) {
    try {
        minioClient.removeObject(
                RemoveObjectArgs.builder()
                        .bucket(bucketName)
                        .object(fileName)
                       .build()
       );
       log.info("File {} deleted successfully from MinIO", fileName);
    } catch (Exception e) {
        throw new MinioFileException("Failed to delete file " + fileName, e);
    }
}

Втягиваемся в Apache Kafka



Явилось свету чудо инженерной мысли. Эльф постиг Apache Kafka

Tеоретическая база


Kafka — это высокопроизводительный брокер сообщений, в основе которого лежат три ключевых элемента: продюсеры, консьюмеры и брокеры. Продюсеры записывают события в лог, консьюмеры их читают, а сам кластер Kafka состоит из множества брокеров, объединённых в единую сеть. Эти брокеры работают синхронно, обеспечивая надёжность и масштабируемость системы. Когда мы говорим, что продюсеры отправляют события в Kafka-кластер, на самом деле они взаимодействуют с его брокерами.


Независимо от версии Apache Kafka, важно понимать роль Zookeeper. Так как продюсеры и консьюмеры не взаимодействуют напрямую, поддержание их согласованного состояния ложится на Zookeeper. Он управляет внутренними процессами Kafka, конфигурацией топиков и пользователей, а также задаёт политики доступа. Кроме того, Zookeeper обеспечивает отказоустойчивость системы, помогая кластеру справляться со сбоями.



Вот они слева направо — ключевые составляющие

▍ Топики


Топик — логическое разделение логов на некие тематики/категории сообщений по группам. Например, сообщения рассылок и уведомлений будут выделены в одно группу, а передача изображений — в другую.


Топики делятся на партиции, а партиции на сегменты. Партиции топиков могут находиться в разных брокерах, тем самым обеспечивая масштабируемость. Сегмент — это физическое представление на дисках.


Рекомендую статью к прочтению, если хочется разобраться подробнее.


▍ Режимы работы и гарантии доставки

  1. At most once (не более одного раза)
    • Сообщение может быть потеряно.
    • Повторная доставка не производится.
    • Применяется крайне редко (в случаях, где потеря данных допустима).
  2. At least once (как минимум один раз)
    • Kafka гарантирует, что сообщение будет доставлено.
    • Возможны дубликаты.
    • Требует идемпотентной обработки на стороне consumer.
    • Используется чаще всего.
  3. Exactly once (ровно один раз)
    • Исключает и потерю, и дублирование сообщений.
    • Реализуется через транзакции Kafka.
    • Требует специальной настройки как producer'а, так и consumer'а.
    • Наиболее надёжный, но и самый сложный по реализации.

Kafka даёт гибкость в управлении надёжностью доставки сообщений, позволяя выбирать подходящий уровень гарантии в зависимости от задачи. Режим at least once

считается «золотой серединой»: он надёжен и относительно прост в реализации — достаточно настроить повторные попытки и включить идемпотентность у продюсера. Однако для критичных операций, где даже дубликат недопустим (например, при переводе денег), применяют exactly once, несмотря на его сложность.

В нашем случае реализуем at least once, правильно указав параметры в properties-файле.


Spring Framework && Kafka


⚙️ Конфигурация и зависимости


Разделим взаимодействие по типу обработки:

  • Синхронные запросы (сохранение, обновление, получение) → gRPC (HTTP 2.0).
  • Асинхронные операции (удаление) → Kafka.

Начнём с конфигурации и зависимостей для работы Spring сервиса. У нас есть выбор из двух библиотек:

Файл конфигурации проекта: pom.xml

<dependencies>
    <!-- Классический Spring Kafka (наш выбор) --> 
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <!-- Реактивная Kafka (Reactor Kafka) -->
    <!-- (например если используется WebFlux или Quarkus framework)-->
    <dependency>
        <groupId>io.projectreactor.kafka</groupId>
        <artifactId>reactor-kafka</artifactId>
    </dependency>
</dependencies>



Spring Kafka реализация


В Spring Kafka отправка сообщений возможна разными способами. Отправка через KafkaTemplate (высокий уровень абстракции):


private void sendKafkaMessage(DataDto dataDto) {
    kafkaTemplate.send(TOPIC_NAME, dataDto);
}

Отправка через ProducerRecord, где можно указать key, partition, timestamp:

private void sendWithPartition(DataDto dataDto, int partition) {
    var producerRecord = new ProducerRecord<>(
        TOPIC_NAME,         // Топик
        partition,          // Номер партиции
        dataDto.getId(),    // Ключ (опционально)
        dataDto             // Сообщение
    );
    kafkaTemplate.send(producerRecord);
}

Отправка через MessageBuilder (низкий уровень абстракции), где можно указать полный пакет настроек сообщения (заголовки, партиции, ключи):


private void sendKafkaMessage(DataDto dataDto) {
    var message = MessageBuilder.withPayload(dataDto)
          .setHeader(KafkaHeaders.TOPIC, TOPIC_NAME)
          .setHeader(KafkaHeaders.PARTITION, 0)
          .setHeader(KafkaHeaders.KEY, "my-key")
          .build();
    kafkaTemplate.send(message);
}

Kafka Producer


Начать необходимо с описания параметров нашего продюсера, с его настроек. Для этого впишем в application.properties переменные и подтянем их через @ConfigurationProperties(prefix = «spring.kafka»). На самом деле Spring может делать это автоматически, в случае самых стандартных параметров, но я бы делал свой компонент свойств:


Класс KafkaCustomProperties
@Data
@Component
@FieldDefaults(level = AccessLevel.PRIVATE)
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaCustomProperties {
    String bootstrapServers;
    Producer producer;

    @Data
    @FieldDefaults(level = AccessLevel.PRIVATE)
    public static class Producer {
        Integer retries;
        String acks;
        Integer deliveryTimeout;
        Integer retryBackoff;
        Integer lingerMs;
        Boolean enableIdempotence;
    }
}


Сами переменные прописываем в application.properties


spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.retries=3
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.acks=all
spring.kafka.producer.delivery-timeout=120000
spring.kafka.producer.retry-backoff=1000
spring.kafka.producer.linger-ms=50
kafka.delete-image.topic=delete-image-topic
// Эти параметры подбираются исходя из опытной нагрузки и требований к надёжности, и, соответственно, могут меняться по ситуации.

Здесь указали основные параметры продюсера — это:

  • Адреса Kafka-брокеров, например: "localhost:9092" или "kafka-1:9092,kafka-2:9092".
  • Количество повторных попыток (retries).
  • Подтверждение получения сообщений (acks): «0» — не ждать, «1» — ждать от одного брокера, «all» — дождаться подтверждения от всех реплик.
  • Таймаут доставки в миллисекундах (delivery.timeout.ms).
  • Задержка перед отправкой (linger.ms), что позволяет накапливать сообщения перед отправкой (для увеличения пропускной способности).
  • Повторная отправка при сбоях (retry.backoff.ms).
  • Идемпотентность (enable.idempotence): true — гарантия доставки в нашем случае at least once, и, желательно, оставить true, тем самым не будет дублирования в рамках одной партиции.
Напишем config файл:
KafkaConfig
@Configuration
@RequiredArgsConstructor
@FieldDefaults(level= AccessLevel.PRIVATE, makeFinal = true)
public class KafkaConfig {

    KafkaCustomProperties kafkaProperties;

    @Bean
    public KafkaTemplate<String, DeleteImageRequest> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    private ProducerFactory<String, DeleteImageRequest> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer<>());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducer().getRetries());
        props.put(ProducerConfig.ACKS_CONFIG, kafkaProperties.getProducer().getAcks());
        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProperties.getProducer().getDeliveryTimeout());
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaProperties.getProducer().getRetryBackoff());
        props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProperties.getProducer().getLingerMs());
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProperties.getProducer().getEnableIdempotence());

        return props;
    }
}


Здесь дополнительно указывали сериализацию ключей и значений с помощью StringSerializer.class, JsonSerializer.class.

Напишем реализацию сервиса отправки запросов на удаление в микросервис Изображений.
KafkaImageServiceImpl
@Service
@Slf4j
@RequiredArgsConstructor
@FieldDefaults(level= AccessLevel.PRIVATE, makeFinal = true)
public class KafkaImageServiceImpl implements KafkaImageService {

    KafkaTemplate<String, DeleteImageRequest> kafkaTemplate;

    @NonFinal
    @Value("${kafka.delete-image.topic}")
    String deleteImageTopic;

    @Override
    public void deleteImage(DeleteImageRequest deleteImageRequest) {
        Message<DeleteImageRequest> message = MessageBuilder
                .withPayload(deleteImageRequest)
                .setHeader(KafkaHeaders.TOPIC, deleteImageTopic)
                .setHeader(KafkaHeaders.KEY, "delete-image-key")
                .build();

        kafkaTemplate.send(message)
                .whenComplete((result, ex) -> {
                    if (ex == null) {
                        log.info("Delete image request sent successfully with offset: {}",
                                result.getRecordMetadata().offset());
                    } else {
                        log.error("Failed to send delete image request: {}", ex.getMessage());
                    }
                });
    }
}

Собираем Message через MessageBuilder, указывая полезную нагрузку, имя топика, ключ для партиционирования и отправляем через KafkaTemplate, логируя успех или ошибку.


public record DeleteImageRequest (
        Long incidentId
){}

В запросе только id инцидента, связанные с которым изображения необходимо удалить.


Kafka Consumer

Свойство Описание Значение по умолчанию Выбранное значение
bootstrapServers Адреса Kafka-брокеров null localhost:9092
groupId Группа consumer’ов, к которой он
принадлежит
null image-service-group
autoOffsetReset Политика обработки офсетов
(earliest, latest, none)
latest earliest (читаем сначала)
keyDeserializer Класс для десериализации ключа сообщения String Serializer String Serializer
valueDeserializer Класс для десериализации значения сообщения String Serializer Json Serializer
enableAutoCommit Включить или отключить автофиксацию
офсетов
true false (вручную фиксируем офсет, реализуя режим at least once)
maxPollRecords Максимальное количество
сообщений за один poll()
500 по умолчанию
concurrency Определяет количество потоков в
ConcurrentKafkaListenerContainer-
Factory
1 3 (Spring Kafka сам создаст потоковые обработчики)
Offset — это уникальный номер сообщения в партиции Kafka. Каждое сообщение в топике имеет смещение (offset), и консьюмер использует его, чтобы понимать, какие сообщения уже обработаны.

В Кафка есть множество других настроек, но в 99% случаев вышеуказанных достаточно.

class KafkaProperties
@Data
@Component
@FieldDefaults(level = AccessLevel.PRIVATE)
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
    String bootstrapServers;
    Consumer consumer;

    @Data
    public static class Consumer {
        String groupId;
        String autoOffsetReset;
        String keyDeserializer;
        String valueDeserializer;
        Integer maxPollRecords;
        Integer concurrency;
    }
}


В config классе следует передать свойства и создать бин фабрики с конфигурацией, контейнеры для Kafka слушателей.

class KafkaConfig
@Configuration
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class KafkaConfig {

    KafkaProperties kafkaProperties;
    DefaultErrorHandler kafkaErrorHandler;

    @Bean
    public ConsumerFactory<String, DeleteImageRequest> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCustomProperties.getBootstrapServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaCustomProperties.getGroupId());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaCustomProperties.getAutoOffsetReset());

        // Указываем ErrorHandlingDeserializer как основной десериализатор
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);

        // Указываем делегаты для ErrorHandlingDeserializer
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);

        // Настройки JsonDeserializer
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, DeleteImageRequest.class.getName());

        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaCustomProperties.getMaxPollRecords());

        // Убираем ручное создание JsonDeserializer
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, DeleteImageRequest> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, DeleteImageRequest> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(kafkaCustomProperties.getConcurrency());
        factory.setCommonErrorHandler(kafkaErrorHandler);
        return factory;
    }
}


Также мы указали сценарий на случай ошибок — kafkaErrorHandler. Если консьюмер не может обработать сообщение, оно попадает в этот обработчик для логирования и отправки в DLT (Dead Letter Topic) — специальный топик для вызывающих ошибки сообщений. Упрощённо handler выглядит так:


@Configuration
public class KafkaErrorHandlerConfig {

    @Bean
    public DefaultErrorHandler kafkaErrorHandler(KafkaTemplate<String, DeleteImageRequest> kafkaTemplate) {
        return new DefaultErrorHandler((record, exception) -> {
            String key = record.key() != null ? (String) record.key() : "unknown-key";
            DeleteImageRequest value = (DeleteImageRequest) record.value();

            kafkaTemplate.send("delete-image-topic-dlt", key, value);
        });
    }
}

Осталось рассмотреть создание читателя, то есть консьюмера. Напомню, что имплементируем «интерфейс для реализации» по паттерну, определяя в нём, какие методы необходимо реализовать. Упрощённо этот сервис выглядит так:

KafkaImageConsumerImpl
@Component
@RequiredArgsConstructor
@FieldDefaults(level= AccessLevel.PRIVATE, makeFinal=true)
public class KafkaImageConsumerImpl implements KafkaImageConsumer {
    ImageFacade imageFacade;

    @Override
    @KafkaListener(
            topics = "${spring.kafka.consumer.image.topic}",
            groupId = "${spring.kafka.consumer.group-id}",
            containerFactory = "kafkaListenerContainerFactory"
    )
    public void consumeDeleteImageRequest(ConsumerRecord<String, DeleteImageRequest> record, Acknowledgment ack) {
        jwtAuthenticationService.authenticateJwt(extractJwtToken(record));
        DeleteImageRequest deleteImageRequest = record.value();
        processAndDeleteImage(deleteImageRequest);

        // в случае ошибки, обработку перехватит DefaultErrorHandler и офсет не будет зафиксирован
        // в случае успешной обработки, KafkaContainer автоматически сделать commitSync()
    }
  
    private void processAndDeleteImage(DeleteImageRequest deleteImageRequest) {
        try {
            imageFacade.deleteImagesFromKafkaRequest(deleteImageRequest);
        } catch (Exception e) {
            throw new FileStorageException("Some error message: " + 
                       deleteImageRequest.incidentId(), e);
        }
    }
}


Здесь обратим внимание на аннотацию @KafkaListener, где указываем прослушиваемый топик, группу и фабрику контейнеров, сконфигурированную ранее. Принимаем мы consumerRecord, извлекаем структуру, которая и есть наш запрос на удаление DeleteImageRequest.


Рекомендую использовать Big Data Tools Plugin в IDE и Kafka-ui, позволяющие мониторить отправляемые данные и метаданные сообщения 👍

Синхронное взаимодействие с gRpc


Радикальное решение радикальных гномов — радикально закатопультить послание

Теоретическая база


gRpc — это современная высокопроизводительная система удалённого вызова процедур (RPC), разработанная в Google. Он предназначен для построения распределённых систем, позволяя клиентам и серверам, взаимодействовать друг с другом посредством определённых методов, как если бы они были локальными.


Это надо знать!

  • Использование HTTP/2 — обеспечивает высокую скорость, мультиплексирование запросов и эффективное управление соединениями.
  • Протокол сериализации Protocol Buffers (protobuf) — компактный и быстрый бинарный формат передачи данных. Вместо тех же тяжёлых JSON'ов, которые можно получать, обращаясь к REST API. Меньше размер -> быстрее передача данных (в 7–10 раз)
  • Автогенерация кода под все популярные языки программирования (Java, Python, C++, Go, etc.)
  • Различные типы вызовов:
    • Unary Rpc — клиент отправляет один запрос и получает один ответ,
    • Server Streaming Rpc — клиент отправляет один запрос, сервер отвечает потоком данных,
    • Client Streaming Rpc — клиент отправляет поток данных, сервер отвечает одним ответом,
    • Bidirectional Streaming Rpc — двусторонний поток сообщений между клиентом и сервером.
  • Безопасность — встроенная TLS.

gRPC идеально подходит для микросервисной архитектуры, высоконагруженных систем и облачных сервисов, обеспечивая быструю и надёжную передачу данных между сервисами.


Рассмотрим механизм RPC-взаимодействия


Remote Procedure Call (RPC) — это механизм, при котором клиент вызывает удалённую процедуру так, как если бы она выполнялась локально, передавая параметры. Однако фактическое выполнение происходит на стороне сервера, а клиент получает результат в упакованном виде, используя предопределённый формат передачи данных.


Разберёмся на примере:

Архитектура вызова удалённой процедуры сохранения изображения

Рассмотрим алгоритм сохранения изображения:

  1. Клиент (Incident Service) – отправляет запрос на загрузку изображения, передавая его параметры (файл, метаданные и т. д.).
  2. Client RPC Stub – это сгенерированный gRPC-клиент, который принимает вызов метода у клиента (Incident Service), сериализует данные в protobuf и отправляет gRPC-запрос на сервер (Image Service) через HTTP/2.
  3. gRPC Server (Image Service) – принимает gRPC-запрос, десериализует данные и вызывает соответствующий обработчик (метод в сервисе Image Service), который выполняет сохранение изображения.
  4. Обработчик на сервере (Image Service) – выполняет основную бизнес-логику: сохранение, генерацию url и id, формирование ответа.
  5. Server RPC Stub – сериализует результат (Id и Url) в protobuf и отправляет ответ клиенту.
  6. Client RPC Stub (на стороне Incident Service) – принимает ответ, десериализует его и передаёт приложению (Incident Service), которое может использовать полученные Id и Url изображения.

Image_service.proto — общее описание для обоих микросервисов и содержит gRPC-сервис для работы с изображениями. Он определяет контракты взаимодействия между клиентом (Incident Service) и сервером (Image Service). В этом файле указываются сообщения (protobuf) и методы, доступные для вызова через gRPC.

pom.xml

<dependencies>
    <!-- gRPC Spring Boot Starter (Для сервера) -->
    <dependency>
        <groupId>net.devh</groupId>
        <artifactId>grpc-spring-boot-starter</artifactId>
        <version>${net.devh.version}</version>
    </dependency>

    <!-- gRPC Spring Boot Client Starter (Для клиента) -->
    <dependency>
        <groupId>net.devh</groupId>
        <artifactId>grpc-client-spring-boot-starter</artifactId>
        <version>${net.devh.version}</version>
    </dependency>

    <!-- gRPC Core Dependencies -->
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-netty</artifactId>
        <version>${grpc.version}</version>
    </dependency>

    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-protobuf</artifactId>
        <version>${grpc.version}</version>
    </dependency>

    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-stub</artifactId>
        <version>${grpc.version}</version>
    </dependency>

    <!-- Protobuf Compiler Plugin -->
    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>${protobuf.version}</version>
    </dependency>

</dependencies>

<build>
    <extensions>
        <extension>
            <groupId>kr.motd.maven</groupId>
            <artifactId>os-maven-plugin</artifactId>
        </extension>
    </extensions>

    <plugins>
        <plugin>
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <configuration>
                <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
                <pluginId>grpc-java</pluginId>
                <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
            </configuration>
            <executions>
                <execution>
                    <id>compile</id>
                    <goals>
                        <goal>compile</goal>
                        <goal>compile-custom</goal>
                    </goals>
                </execution>
                <execution>
                    <id>test-compile</id>
                    <goals>
                        <goal>test-compile</goal>
                        <goal>test-compile-custom</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>


За генерацию кода, исходя из этого описания, отвечает компилятор protoc. Например, сущность запроса на сохранение и ответ выглядят так:


syntax = "proto3";

package image;

message SaveImageRequest {
  int64 incident_id = 1;
  bytes file_data = 2;
  int64 size = 3;
  string type = 4;
}

message SaveImageResponse {
  int64 image_id = 1;
  string url = 2;
}

Взаимодействия с этими сущностями строится посредством описания в proto-файле структуры, а Java-кодом — сама реализация. Например:


service ImageService {
  rpc SaveImage (SaveImageRequest) returns (SaveImageResponse);
}

Сам метод сервиса описывается один раз в нашем случае на микросервисе Изображений (Клиент), который принимает файлы на сохранение и упрощённо выглядит так:

Класс GrpcImageServiceImpl
@Service
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class GrpcImageServiceImpl extends ImageServiceImplBase {
    ImageFacade imageFacade;
    ImageMapper imageMapper;

    @Override
    public void saveImage(SaveImageRequest request, 
                      StreamObserver<SaveImageResponse> responseObserver) {
        try {
          ImageDto imageDto = imageMapper.toImageDto(request);
        
          Image savedImage = imageFacade.saveImage(imageDto, file);
        
          SaveImageResponse response = imageMapper.toSaveImageResponse(savedImage);
        
          responseObserver.onNext(response);
          responseObserver.onCompleted();
        } catch (Exception e) {
            responseObserver.onError(
              new RuntimeException("Ошибка при сохранении изображения: " + 
                               + e.getMessage(), e));
        }
    }  
}


Сервис наследуется от сгенерированного ImageServiceImplBase и здесь мы переопределяем (@Override) gRpc-метод. Он принимает request сериализованный protobuf’ом, маппит его в ImageDto, сохраняет и формирует ответ.


StreamObserver — интерфейс для передачи данных в ответ на запрос. onNext() — отправка, onCompleted() — завершает поток, onError() — сообщает об ошибке.


Осталось посмотреть на сам вызов удалённой процедуры и упрощённо выглядит так:

Класс GrpcClientServiceImpl
@Service
@RequiredArgsConstructor
@FieldDefaults(level = PRIVATE, makeFinal = true)
public class GrpcClientServiceImpl implements GrpcClientService{

    ImageServiceBlockingStub imageServiceStub;
    UserServiceBlockingStub userServiceStub;
    ImageRequestMapper imageRequestMapper;

    @Override
    public SaveImageResponse uploadImages(List<ImageDto> imageDtoList, Incident incident) {
      for (ImageDto imageDto : imageDtoList) {
          try {
            SaveImageRequest request = imageRequestMapper.toSaveImageRequest(imageDto);

            SaveImageResponse response = imageServiceStub.saveImage(request);

            return response;
          } catch (StatusRuntimeException e) {
              throw new ImageProcessingException("Some error message: " 
                      + incident.getId(), e);
          } catch (Exception e) {
              throw new ImageProcessingException("Some error message: "
                      + incident.getId() + " with unexpected error: ", e);
          }
        }
    }
}


Сервис наследуется от GrpcClientService (паттерн Interface for Implementation) и реализует метод загрузки изображений на сервер. Здесь мы маппим Dto в Request, вызываем блокирующий метод сохранения, так как нам важно зафиксировать успех выполнения операции, и возвращаем ответ.


Результаты


Запрос с обязательными параметрами в postman:



POST-запрос на создание инцидента

По запросу на Incident api создаётся запись в БД об инциденте, а изображение передаётся по gRpc на Image-service, который пишет в minio и в ui мы видим такую картину:



В результате в minio — 1 объект

Выполнив запрос на удаление с указанием id инцидента:


Запрос на удаление инцидента

Incident-service удаляет информацию об инциденте и асинхронно запрашивает удаление у Image-service, записывая в топик Kafka сообщение. Как и предполагалось в delete-image-topic сообщение с id инцидента, связанные с которым изображения необходимо удалить, что можно увидеть в kafka-ui:



Сообщение с id инцидента

Полный код с учётом всех текущих и будущих функциональностей, документаций и инструкцией к локальному развёртыванию доступен на моём GitHub. Буду рад, если вам понравится проект, и не стесняйтесь ставить звёздочки!

Заключение


В этой статье мы реализовали микросервисное взаимодействие с помощью gRpc и Kafka. Рассмотрели создание микросервиса изображений, его функциональность и архитектуру, научились работать с S3 хранилищами, на примере Minio, реализовали синхронное взаимодействие по gRpc и асинхронное с помощью Kafka.

Мир Java-разработки — это бесконечное поле для экспериментов, творчества и обучения. Надеюсь, вы нашли эту статью полезной и вдохновляющей. Если у вас есть вопросы, замечания или идеи — делитесь ими в комментариях, буду искренне рад конструктивной критике, поскольку вместе мы сделаем этот путь ещё более интересным и продуктивным.


Присоединяйтесь к моему Telegram-каналу. До встречи в следующей статье! 🚀


© 2025 ООО «МТ ФИНАНС»

Telegram-канал со скидками, розыгрышами призов и новостями IT 💻
Tags:
Hubs:
+80
Comments7

Articles

Information

Website
ruvds.com
Registered
Founded
Employees
11–30 employees
Location
Россия
Representative
ruvds