
Привет! Меня зовут Бромбин Андрей, и сегодня я продолжаю цикл статей о создании микросервисного приложения с нуля. В этом выпуске мы сосредоточимся на взаимодействии между микросервисами, используя два подхода: асинхронный с помощью Kafka и синхронный через gRPC.
Независимо от вашего опыта, этот туториал предоставит вам готовые решения и ценные знания для создания Java Spring Microservices. Начинающие разработчики получат чёткое пошаговое руководство, а опытные специалисты — новые идеи, практические примеры и возможности для обмена опытом с коллегами. Разрабатывайте микросервисы эффективно и достигайте отличных результатов!
О чём эта статья?
В этой статье мы вместе реализуем микросервис для обработки изображений, обсудим архитектурные решения, необходимость выделения его в отдельный сервис, а также выберем подходящие технологии для синхронного и асинхронного взаимодействия. В конце — ссылки на репозитории и вспомогательные материалы.
«Как передавать сообщения гномам, живущим в глубинах Громовых Пещер?» — вопрос, стоящий перед каждым архитектором распределённых систем.Мы рассмотрим:
- Микросервис изображений: функциональность и архитектура.
- Выбор и настройка объектного хранилища (MinIO).
- Асинхронное взаимодействие с помощью Kafka: гарантии доставки, продюсер, консьюмер, топики, DLT.
- Синхронное взаимодействие через gRpc: protobuf, генерация кода, типы RPC-вызовов и реализация клиента и сервера на примере сохранения изображения.
▍ Итоги предыдущей части
Часть 1: Как я создал идеальный REST API — микросервис инцидентов на Java и Spring
В предыдущей итерации цикла мы рассмотрели создание идеального REST API на примере микросервиса Инцидентов. Узнали об основных HTTP-методах, разобрались с пагинацией, улучшили читаемость, осознанно используя Lombok-аннотации, инкапсулировали данные с помощью DTO, научились правильно писать эндпоинты и их url'ы, использовали Глобальный обработчик ошибок для методов контроллера, а также при решении всех поставленных задач использовали уместные паттерны проектирования.
Микросервис изображений и предметная область
Бизнес требует возможность прикреплять изображения к инциденту, и перед нами встаёт вопрос, как лучше реализовать эту функциональность. Конечно, можно было бы просто расширить существующий сервис инцидентов, но это не всегда рационально.
Архитектура разрабатываемой системыВо-первых, инциденты без изображений — довольно частый кейс, и усложнение основного сервиса приведёт к ненужным зависимостям и увеличению нагрузки. Во-вторых, хранение и обработка изображений — это отдельная задача, которая требует специализированных решений, таких как объектное хранилище.
Использование микросервисной архитектуры позволит нам отделить эту логику в независимый сервис, который можно развивать и масштабировать независимо от остальной системы.
Сущность Image

Класс Image представляет собой JPA-сущность, помеченную как @Entity, отвечающую за хранение информации об изображениях:id инцидента (auto-increment), url в хранилище, метаинформация: тип, размер, имя файла.
Рекомендуется явно использовать @Getter, @Setter, @NoArgsConstructor и @AllArgsConstructor, а не @Data, чтобы избежать бесконечной рекурсии в связях и ненужного кода.
Выбор хранилища изображений
Объектное хранилище — это система хранения данных, в которой файлы хранятся в виде объектов внутри контейнеров (bucket), без традиционной файловой иерархии. Каждый объект содержит данные, уникальный идентификатор и метаданные, что позволяет гибко управлять доступом и масштабируемостью. Такой подход абстрагирует хранение данных от файловой системы и обеспечивает гибкость в управлении.
Наиболее распространённый стандарт — S3-совместимые хранилища (Amazon S3, MinIO, Ceph и др.). Для локального развёртывания и тестирования удобнее всего использовать MinIO, так как он полностью совместим с S3 API и прост в настройке. Для больших проектов в продакшен среде используют облачные хранилища от AWS, Google Cloud, Azure и другие, так как поддерживают автоматическое масштабирование, репликацию, доступность и политику безопасности в соответствии со стандартами.
Для знакомства с S3-хранилищем используем MinIO. Это позволит разобраться с принципами работы объектного хранилища в локальной среде. Для этого необходимо:
Добавить зависимость в pom.xml
<dependency> <groupId>io.minio</groupId> <artifactId>minio</artifactId> <version>${minio.version}</version> </dependency>
Развернём MinIO в Docker. Напишем docker-compose.yml, где укажем:
- Образ:
minio/minio:latest - Имя контейнера:
minio_container - Credentials для root-пользователя:
MINIO_ROOT_USER,MINIO_ROOT_PASSWORD - Порты:
- Основной порт для сервиса:
9000:9000 - Порт веб-интерфейса:
9003:9003
- Основной порт для сервиса:
- Команда для запуска (каталог хранения —
/data, порт веб-консоли —:9003) - Том:
minio_data:/dataмонтируется в контейнер по пути/data
version: '3.8' services: minio: image: minio/minio:latest container_name: minio_container environment: MINIO_ROOT_USER: <ваш_юзер> MINIO_ROOT_PASSWORD: <ваш_пароль> ports: - "9000:9000" - "9003:9003" command: server /data --console-address ":9003" volumes: - minio_data:/data volumes: minio_data:
Автоматически подтягиваем переменные окружения из application.properties:
@Data @Component @ConfigurationProperties(prefix = "minio") @FieldDefaults(level = AccessLevel.PRIVATE) public class MinioProperties { String endpoint; String accessKey; String secretKey; String bucketName; }
Теперь осталось написать MinIO Config для работы внутри нашего Spring приложения:
@Configuration @RequiredArgsConstructor @FieldDefaults(level= AccessLevel.PRIVATE, makeFinal = true) public class MinioConfig { MinioProperties minioProperties; @Bean public MinioClient minioClient() { return MinioClient.builder() .endpoint(minioProperties.getEndpoint()) .credentials(minioProperties.getAccessKey(), minioProperties.getSecretKey()) .build(); } @Bean public String bucketName() { return minioProperties.getBucketName(); } }
И всё что остаётся — прописать логику работы с бином minioClient и, непосредственно, сохранение файла в хранилище. Для этого сгенерировали уникальное имя файла:
String uniqueFileName = "image_" + imageToSave.getFileName() + UUID.randomUUID();
Напишем реализацию интерфейса — MinioServiceImpl. Метод uploadFile вызывает на MinioClient метод putObject, которому передаётся PutObjectArgs.builder() с параметрами:
bucket(bucketName): имя бакета, в который загружается файл (берётся из конфигурации);object(fileName): имя объекта в хранилище;stream(file.getInputStream(), file.getSize(), -1): поток файла, его размер и-1как неизвестное значение части потока;contentType(file.getContentType()): MIME-тип файла.
После успешной загрузки вызывается getPresignedObjectUrl, который создаёт подписанный URL для скачивания файла, передавая в GetPresignedObjectUrlArgs.builder() имя бакета и объекта, а также метод Method.GET. Для удаления аналогичным образом используется метод minioClient.removeObject().
@Override public String uploadFile(MultipartFile file, String fileName) { try { minioClient.putObject( PutObjectArgs.builder() .bucket(bucketName) .object(fileName) .stream(file.getInputStream(), file.getSize(), -1) .contentType(file.getContentType()) .build() ); log.info("File {} uploaded successfully to MinIO", fileName); return minioClient.getPresignedObjectUrl( GetPresignedObjectUrlArgs.builder() .bucket(bucketName) .object(fileName) .method(Method.GET) .build() ); } catch (Exception e) { throw new MinioFileException("Failed to upload file " + fileName, e); } } @Override public void deleteFile(String fileName) { try { minioClient.removeObject( RemoveObjectArgs.builder() .bucket(bucketName) .object(fileName) .build() ); log.info("File {} deleted successfully from MinIO", fileName); } catch (Exception e) { throw new MinioFileException("Failed to delete file " + fileName, e); } }
Втягиваемся в Apache Kafka

Явилось свету чудо инженерной мысли. Эльф постиг Apache Kafka
Tеоретическая база
Kafka — это высокопроизводительный брокер сообщений, в основе которого лежат три ключевых элемента: продюсеры, консьюмеры и брокеры. Продюсеры записывают события в лог, консьюмеры их читают, а сам кластер Kafka состоит из множества брокеров, объединённых в единую сеть. Эти брокеры работают синхронно, обеспечивая надёжность и масштабируемость системы. Когда мы говорим, что продюсеры отправляют события в Kafka-кластер, на самом деле они взаимодействуют с его брокерами.
Независимо от версии Apache Kafka, важно понимать роль Zookeeper. Так как продюсеры и консьюмеры не взаимодействуют напрямую, поддержание их согласованного состояния ложится на Zookeeper. Он управляет внутренними процессами Kafka, конфигурацией топиков и пользователей, а также задаёт политики доступа. Кроме того, Zookeeper обеспечивает отказоустойчивость системы, помогая кластеру справляться со сбоями.

Вот они слева направо — ключевые составляющие
▍ Топики
Топик — логическое разделение логов на некие тематики/категории сообщений по группам. Например, сообщения рассылок и уведомлений будут выделены в одно группу, а передача изображений — в другую.
Топики делятся на партиции, а партиции на сегменты. Партиции топиков могут находиться в разных брокерах, тем самым обеспечивая масштабируемость. Сегмент — это физическое представление на дисках.
Рекомендую статью к прочтению, если хочется разобраться подробнее.
▍ Режимы работы и гарантии доставки
- At most once (не более одного раза)
- Сообщение может быть потеряно.
- Повторная доставка не производится.
- Применяется крайне редко (в случаях, где потеря данных допустима).
- At least once (как минимум один раз)
- Kafka гарантирует, что сообщение будет доставлено.
- Возможны дубликаты.
- Требует идемпотентной обработки на стороне consumer.
- Используется чаще всего.
- Exactly once (ровно один раз)
- Исключает и потерю, и дублирование сообщений.
- Реализуется через транзакции Kafka.
- Требует специальной настройки как producer'а, так и consumer'а.
- Наиболее надёжный, но и самый сложный по реализации.
Kafka даёт гибкость в управлении надёжностью доставки сообщений, позволяя выбирать подходящий уровень гарантии в зависимости от задачи. Режим at least once
считается «золотой серединой»: он надёжен и относительно прост в реализации — достаточно настроить повторные попытки и включить идемпотентность у продюсера. Однако для критичных операций, где даже дубликат недопустим (например, при переводе денег), применяют exactly once, несмотря на его сложность.В нашем случае реализуем at least once, правильно указав параметры в properties-файле.
Spring Framework && Kafka
⚙️ Конфигурация и зависимости
Разделим взаимодействие по типу обработки:
- Синхронные запросы (сохранение, обновление, получение) → gRPC (HTTP 2.0).
- Асинхронные операции (удаление) → Kafka.
Начнём с конфигурации и зависимостей для работы Spring сервиса. У нас есть выбор из двух библиотек:
<dependencies> <!-- Классический Spring Kafka (наш выбор) --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- Реактивная Kafka (Reactor Kafka) --> <!-- (например если используется WebFlux или Quarkus framework)--> <dependency> <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> </dependency> </dependencies>
Spring Kafka реализация
В Spring Kafka отправка сообщений возможна разными способами. Отправка через KafkaTemplate (высокий уровень абстракции):
private void sendKafkaMessage(DataDto dataDto) { kafkaTemplate.send(TOPIC_NAME, dataDto); }
Отправка через ProducerRecord, где можно указать key, partition, timestamp:
private void sendWithPartition(DataDto dataDto, int partition) { var producerRecord = new ProducerRecord<>( TOPIC_NAME, // Топик partition, // Номер партиции dataDto.getId(), // Ключ (опционально) dataDto // Сообщение ); kafkaTemplate.send(producerRecord); }
Отправка через MessageBuilder (низкий уровень абстракции), где можно указать полный пакет настроек сообщения (заголовки, партиции, ключи):
private void sendKafkaMessage(DataDto dataDto) { var message = MessageBuilder.withPayload(dataDto) .setHeader(KafkaHeaders.TOPIC, TOPIC_NAME) .setHeader(KafkaHeaders.PARTITION, 0) .setHeader(KafkaHeaders.KEY, "my-key") .build(); kafkaTemplate.send(message); }
Kafka Producer
Начать необходимо с описания параметров нашего продюсера, с его настроек. Для этого впишем в application.properties переменные и подтянем их через @ConfigurationProperties(prefix = «spring.kafka»). На самом деле Spring может делать это автоматически, в случае самых стандартных параметров, но я бы делал свой компонент свойств:
@Data @Component @FieldDefaults(level = AccessLevel.PRIVATE) @ConfigurationProperties(prefix = "spring.kafka") public class KafkaCustomProperties { String bootstrapServers; Producer producer; @Data @FieldDefaults(level = AccessLevel.PRIVATE) public static class Producer { Integer retries; String acks; Integer deliveryTimeout; Integer retryBackoff; Integer lingerMs; Boolean enableIdempotence; } }
Сами переменные прописываем в application.properties
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.producer.retries=3 spring.kafka.producer.properties.enable.idempotence=true spring.kafka.producer.acks=all spring.kafka.producer.delivery-timeout=120000 spring.kafka.producer.retry-backoff=1000 spring.kafka.producer.linger-ms=50 kafka.delete-image.topic=delete-image-topic // Эти параметры подбираются исходя из опытной нагрузки и требований к надёжности, и, соответственно, могут меняться по ситуации.
Здесь указали основные параметры продюсера — это:
- Адреса Kafka-брокеров, например:
"localhost:9092"или"kafka-1:9092,kafka-2:9092". - Количество повторных попыток (
retries). - Подтверждение получения сообщений (
acks): «0» — не ждать, «1» — ждать от одного брокера, «all» — дождаться подтверждения от всех реплик. - Таймаут доставки в миллисекундах (
delivery.timeout.ms). - Задержка перед отправкой (
linger.ms), что позволяет накапливать сообщения перед отправкой (для увеличения пропускной способности). - Повторная отправка при сбоях (
retry.backoff.ms). - Идемпотентность (
enable.idempotence): true — гарантия доставки в нашем случае at least once, и, желательно, оставить true, тем самым не будет дублирования в рамках одной партиции.
@Configuration @RequiredArgsConstructor @FieldDefaults(level= AccessLevel.PRIVATE, makeFinal = true) public class KafkaConfig { KafkaCustomProperties kafkaProperties; @Bean public KafkaTemplate<String, DeleteImageRequest> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } private ProducerFactory<String, DeleteImageRequest> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer<>()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducer().getRetries()); props.put(ProducerConfig.ACKS_CONFIG, kafkaProperties.getProducer().getAcks()); props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProperties.getProducer().getDeliveryTimeout()); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaProperties.getProducer().getRetryBackoff()); props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProperties.getProducer().getLingerMs()); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProperties.getProducer().getEnableIdempotence()); return props; } }
Здесь дополнительно указывали сериализацию ключей и значений с помощью StringSerializer.class, JsonSerializer.class.
@Service @Slf4j @RequiredArgsConstructor @FieldDefaults(level= AccessLevel.PRIVATE, makeFinal = true) public class KafkaImageServiceImpl implements KafkaImageService { KafkaTemplate<String, DeleteImageRequest> kafkaTemplate; @NonFinal @Value("${kafka.delete-image.topic}") String deleteImageTopic; @Override public void deleteImage(DeleteImageRequest deleteImageRequest) { Message<DeleteImageRequest> message = MessageBuilder .withPayload(deleteImageRequest) .setHeader(KafkaHeaders.TOPIC, deleteImageTopic) .setHeader(KafkaHeaders.KEY, "delete-image-key") .build(); kafkaTemplate.send(message) .whenComplete((result, ex) -> { if (ex == null) { log.info("Delete image request sent successfully with offset: {}", result.getRecordMetadata().offset()); } else { log.error("Failed to send delete image request: {}", ex.getMessage()); } }); } }
Собираем Message через MessageBuilder, указывая полезную нагрузку, имя топика, ключ для партиционирования и отправляем через KafkaTemplate, логируя успех или ошибку.
public record DeleteImageRequest ( Long incidentId ){}
В запросе только id инцидента, связанные с которым изображения необходимо удалить.
Kafka Consumer
| Свойство | Описание | Значение по умолчанию | Выбранное значение |
|---|---|---|---|
bootstrapServers |
Адреса Kafka-брокеров | null | localhost:9092 |
groupId |
Группа consumer’ов, к которой он принадлежит |
null | image-service-group |
autoOffsetReset |
Политика обработки офсетов (earliest, latest, none) |
latest | earliest (читаем сначала) |
keyDeserializer |
Класс для десериализации ключа сообщения | String Serializer | String Serializer |
valueDeserializer |
Класс для десериализации значения сообщения | String Serializer | Json Serializer |
enableAutoCommit |
Включить или отключить автофиксацию офсетов |
true | false (вручную фиксируем офсет, реализуя режим at least once) |
maxPollRecords |
Максимальное количество сообщений за один poll() |
500 | по умолчанию |
concurrency |
Определяет количество потоков в ConcurrentKafkaListenerContainer- Factory |
1 | 3 (Spring Kafka сам создаст потоковые обработчики) |
Offset — это уникальный номер сообщения в партиции Kafka. Каждое сообщение в топике имеет смещение (offset), и консьюмер использует его, чтобы понимать, какие сообщения уже обработаны.
В Кафка есть множество других настроек, но в 99% случаев вышеуказанных достаточно.
@Data @Component @FieldDefaults(level = AccessLevel.PRIVATE) @ConfigurationProperties(prefix = "spring.kafka") public class KafkaProperties { String bootstrapServers; Consumer consumer; @Data public static class Consumer { String groupId; String autoOffsetReset; String keyDeserializer; String valueDeserializer; Integer maxPollRecords; Integer concurrency; } }
В config классе следует передать свойства и создать бин фабрики с конфигурацией, контейнеры для Kafka слушателей.
@Configuration @RequiredArgsConstructor @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class KafkaConfig { KafkaProperties kafkaProperties; DefaultErrorHandler kafkaErrorHandler; @Bean public ConsumerFactory<String, DeleteImageRequest> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCustomProperties.getBootstrapServers()); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaCustomProperties.getGroupId()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaCustomProperties.getAutoOffsetReset()); // Указываем ErrorHandlingDeserializer как основной десериализатор props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); // Указываем делегаты для ErrorHandlingDeserializer props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class); props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class); // Настройки JsonDeserializer props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, DeleteImageRequest.class.getName()); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaCustomProperties.getMaxPollRecords()); // Убираем ручное создание JsonDeserializer return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, DeleteImageRequest> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, DeleteImageRequest> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(kafkaCustomProperties.getConcurrency()); factory.setCommonErrorHandler(kafkaErrorHandler); return factory; } }
Также мы указали сценарий на случай ошибок — kafkaErrorHandler. Если консьюмер не может обработать сообщение, оно попадает в этот обработчик для логирования и отправки в DLT (Dead Letter Topic) — специальный топик для вызывающих ошибки сообщений. Упрощённо handler выглядит так:
@Configuration public class KafkaErrorHandlerConfig { @Bean public DefaultErrorHandler kafkaErrorHandler(KafkaTemplate<String, DeleteImageRequest> kafkaTemplate) { return new DefaultErrorHandler((record, exception) -> { String key = record.key() != null ? (String) record.key() : "unknown-key"; DeleteImageRequest value = (DeleteImageRequest) record.value(); kafkaTemplate.send("delete-image-topic-dlt", key, value); }); } }
Осталось рассмотреть создание читателя, то есть консьюмера. Напомню, что имплементируем «интерфейс для реализации» по паттерну, определяя в нём, какие методы необходимо реализовать. Упрощённо этот сервис выглядит так:
@Component @RequiredArgsConstructor @FieldDefaults(level= AccessLevel.PRIVATE, makeFinal=true) public class KafkaImageConsumerImpl implements KafkaImageConsumer { ImageFacade imageFacade; @Override @KafkaListener( topics = "${spring.kafka.consumer.image.topic}", groupId = "${spring.kafka.consumer.group-id}", containerFactory = "kafkaListenerContainerFactory" ) public void consumeDeleteImageRequest(ConsumerRecord<String, DeleteImageRequest> record, Acknowledgment ack) { jwtAuthenticationService.authenticateJwt(extractJwtToken(record)); DeleteImageRequest deleteImageRequest = record.value(); processAndDeleteImage(deleteImageRequest); // в случае ошибки, обработку перехватит DefaultErrorHandler и офсет не будет зафиксирован // в случае успешной обработки, KafkaContainer автоматически сделать commitSync() } private void processAndDeleteImage(DeleteImageRequest deleteImageRequest) { try { imageFacade.deleteImagesFromKafkaRequest(deleteImageRequest); } catch (Exception e) { throw new FileStorageException("Some error message: " + deleteImageRequest.incidentId(), e); } } }
Здесь обратим внимание на аннотацию @KafkaListener, где указываем прослушиваемый топик, группу и фабрику контейнеров, сконфигурированную ранее. Принимаем мы consumerRecord, извлекаем структуру, которая и есть наш запрос на удаление DeleteImageRequest.
Рекоменд��ю использовать Big Data Tools Plugin в IDE и Kafka-ui, позволяющие мониторить отправляемые данные и метаданные сообщения 👍
Синхронное взаимодействие с gRpc
Радикальное решение радикальных гномов — радикально закатопультить посланиеТеоретическая база
gRpc — это современная высокопроизводительная система удалённого вызова процедур (RPC), разработанная в Google. Он предназначен для построения распределённых систем, позволяя клиентам и серверам, взаимодействовать друг с другом посредством определённых методов, как если бы они были локальными.
Это надо знать!
- Использование HTTP/2 — обеспечивает высокую скорость, мультиплексирование запросов и эффективное управление соединениями.
- Протокол сериализации Protocol Buffers (protobuf) — компактный и быстрый бинарный формат передачи данных. Вместо тех же тяжёлых JSON'ов, которые можно получать, обращаясь к REST API. Меньше размер -> быстрее передача данных (в 7–10 раз)
- Автогенерация кода под все популярные языки программирования (Java, Python, C++, Go, etc.)
- Различные типы вызовов:
- Unary Rpc — клиент отправляет один запрос и получает один ответ,
- Server Streaming Rpc — клиент отправляет один запрос, сервер отвечает потоком данных,
- Client Streaming Rpc — клиент отправляет поток данных, сервер отвечает одним ответом,
- Bidirectional Streaming Rpc — двусторонний поток сообщений между клиентом и сервером.
- Безопасность — встроенная TLS.
gRPC идеально подходит для микросервисной архитектуры, высоконагруженных систем и облачных сервисов, обеспечивая быструю и надёжную передачу данных между сервисами.
Рассмотрим механизм RPC-взаимодействия
Remote Procedure Call (RPC) — это механизм, при котором клиент вызывает удалённую процедуру так, как если бы она выполнялась локально, передавая параметры. Однако фактическое выполнение происходит на стороне сервера, а клиент получает результат в упакованном виде, используя предопределённый формат передачи данных.
Разберёмся на примере:
Архитектура вызова удалённой процедуры сохранения изображенияРассмотрим алгоритм сохранения изображения:
- Клиент (Incident Service) – отправляет запрос на загрузку изображения, передавая его параметры (файл, метаданные и т. д.).
- Client RPC Stub – это сгенерированный gRPC-клиент, который принимает вызов метода у клиента (Incident Service), сериализует данные в protobuf и отправляет gRPC-запрос на сервер (Image Service) через HTTP/2.
- gRPC Server (Image Service) – принимает gRPC-запрос, десериализует данные и вызывает соответствующий обработчик (метод в сервисе Image Service), который выполняет сохранение изображения.
- Обработчик на сервере (Image Service) – выполняет основную бизнес-логику: сохранение, генерацию url и id, формирование ответа.
- Server RPC Stub – сериализует результат (Id и Url) в protobuf и отправляет ответ клиенту.
- Client RPC Stub (на стороне Incident Service) – принимает ответ, десериализует его и передаёт приложению (Incident Service), которое может использовать полученные Id и Url изображения.
Image_service.proto — общее описание для обоих микросервисов и содержит gRPC-сервис для работы с изображениями. Он определяет контракты взаимодействия между клиентом (Incident Service) и сервером (Image Service). В этом файле указываются сообщения (protobuf) и методы, доступные для вызова через gRPC.
<dependencies> <!-- gRPC Spring Boot Starter (Для сервера) --> <dependency> <groupId>net.devh</groupId> <artifactId>grpc-spring-boot-starter</artifactId> <version>${net.devh.version}</version> </dependency> <!-- gRPC Spring Boot Client Starter (Для клиента) --> <dependency> <groupId>net.devh</groupId> <artifactId>grpc-client-spring-boot-starter</artifactId> <version>${net.devh.version}</version> </dependency> <!-- gRPC Core Dependencies --> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> <version>${grpc.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>${grpc.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>${grpc.version}</version> </dependency> <!-- Protobuf Compiler Plugin --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>${protobuf.version}</version> </dependency> </dependencies> <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <configuration> <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> </configuration> <executions> <execution> <id>compile</id> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> <execution> <id>test-compile</id> <goals> <goal>test-compile</goal> <goal>test-compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
За генерацию кода, исходя из этого описания, отвечает компилятор protoc. Например, сущность запроса на сохранение и ответ выглядят так:
syntax = "proto3"; package image; message SaveImageRequest { int64 incident_id = 1; bytes file_data = 2; int64 size = 3; string type = 4; } message SaveImageResponse { int64 image_id = 1; string url = 2; }
Взаимодействия с этими сущностями строится посредством описания в proto-файле структуры, а Java-кодом — сама реализация. Например:
service ImageService { rpc SaveImage (SaveImageRequest) returns (SaveImageResponse); }
Сам метод сервиса описывается один раз в нашем случае на микросервисе Изображений (Клиент), который принимает файлы на сохранение и упрощённо выглядит так:
@Service @RequiredArgsConstructor @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class GrpcImageServiceImpl extends ImageServiceImplBase { ImageFacade imageFacade; ImageMapper imageMapper; @Override public void saveImage(SaveImageRequest request, StreamObserver<SaveImageResponse> responseObserver) { try { ImageDto imageDto = imageMapper.toImageDto(request); Image savedImage = imageFacade.saveImage(imageDto, file); SaveImageResponse response = imageMapper.toSaveImageResponse(savedImage); responseObserver.onNext(response); responseObserver.onCompleted(); } catch (Exception e) { responseObserver.onError( new RuntimeException("Ошибка при сохранении изображения: " + + e.getMessage(), e)); } } }
Сервис наследуется от сгенерированного ImageServiceImplBase и здесь мы переопределяем (@Override) gRpc-метод. Он принимает request сериализованный protobuf’ом, маппит его в ImageDto, сохраняет и формирует ответ.
StreamObserver — интерфейс для передачи данных в ответ на запрос. onNext() — отправка, onCompleted() — завершает поток, onError() — сообщает об ошибке.
Осталось посмотреть на сам вызов удалённой процедуры и упрощённо выглядит так:
@Service @RequiredArgsConstructor @FieldDefaults(level = PRIVATE, makeFinal = true) public class GrpcClientServiceImpl implements GrpcClientService{ ImageServiceBlockingStub imageServiceStub; UserServiceBlockingStub userServiceStub; ImageRequestMapper imageRequestMapper; @Override public SaveImageResponse uploadImages(List<ImageDto> imageDtoList, Incident incident) { for (ImageDto imageDto : imageDtoList) { try { SaveImageRequest request = imageRequestMapper.toSaveImageRequest(imageDto); SaveImageResponse response = imageServiceStub.saveImage(request); return response; } catch (StatusRuntimeException e) { throw new ImageProcessingException("Some error message: " + incident.getId(), e); } catch (Exception e) { throw new ImageProcessingException("Some error message: " + incident.getId() + " with unexpected error: ", e); } } } }
Сервис наследуется от GrpcClientService (паттерн Interface for Implementation) и реализует метод загрузки изображений на сервер. Здесь мы маппим Dto в Request, вызываем блокирующий метод сохранения, так как нам важно зафиксировать успех выполнения операции, и возвращаем ответ.
Результаты
Запрос с обязательными параметрами в postman:

POST-запрос на создание инцидента
По запросу на Incident api создаётся запись в БД об инциденте, а изображение передаётся по gRpc на Image-service, который пишет в minio и в ui мы видим такую картину:

В результате в minio — 1 объект
Выполнив запрос на удаление с указанием id инцидента:

Запрос на удаление инцидента
Incident-service удаляет информацию об инциденте и асинхронно запрашивает удаление у Image-service, записывая в топик Kafka сообщение. Как и предполагалось в delete-image-topic сообщение с id инцидента, связанные с которым изображения необходимо удалить, что можно увидеть в kafka-ui:

Сообщение с id инцидента
Полный код с учётом всех текущих и будущих функциональностей, документаций и инструкцией к локальному развёртыванию доступен на моём GitHub. Буду рад, если вам понравится проект, и не стесняйтесь ставить звёздочки!
Заключение
В этой статье мы реализовали микросервисное взаимодействие с помощью gRpc и Kafka. Рассмотрели создание микросервиса изображений, его функциональность и архитектуру, научились работать с S3 хранилищами, на примере Minio, реализовали синхронное взаимодействие по gRpc и асинхронное с помощью Kafka.
Мир Java-разработки — это бесконечное поле для экспериментов, творчества и обучения. Надеюсь, вы нашли эту статью полезной и вдохновляющей. Если у вас есть вопросы, замечания или идеи — делитесь ими в комментариях, буду искренне рад конструктивной критике, поскольку вместе мы сделаем этот путь ещё более интересным и продуктивным.
Присоединяйтесь к моему Telegram-каналу. До встречи в следующей статье! 🚀
© 2025 ООО «МТ ФИНАНС»
Telegram-канал со скидками, розыгрышами призов и новостями IT 💻

