
Привет! Меня зовут Бромбин Андрей, и сегодня я продолжаю цикл статей о создании микросервисного приложения с нуля. В этом выпуске мы сосредоточимся на взаимодействии между микросервисами, используя два подхода: асинхронный с помощью Kafka и синхронный через gRPC.
Независимо от вашего опыта, этот туториал предоставит вам готовые решения и ценные знания для создания Java Spring Microservices. Начинающие разработчики получат чёткое пошаговое руководство, а опытные специалисты — новые идеи, практические примеры и возможности для обмена опытом с коллегами. Разрабатывайте микросервисы эффективно и достигайте отличных результатов!
О чём эта статья?
В этой статье мы вместе реализуем микросервис для обработки изображений, обсудим архитектурные решения, необходимость выделения его в отдельный сервис, а также выберем подходящие технологии для синхронного и асинхронного взаимодействия. В конце — ссылки на репозитории и вспомогательные материалы.

Мы рассмотрим:
- Микросервис изображений: функциональность и архитектура.
- Выбор и настройка объектного хранилища (MinIO).
- Асинхронное взаимодействие с помощью Kafka: гарантии доставки, продюсер, консьюмер, топики, DLT.
- Синхронное взаимодействие через gRpc: protobuf, генерация кода, типы RPC-вызовов и реализация клиента и сервера на примере сохранения изображения.
▍ Итоги предыдущей части
Часть 1: Как я создал идеальный REST API — микросервис инцидентов на Java и Spring
В предыдущей итерации цикла мы рассмотрели создание идеального REST API на примере микросервиса Инцидентов. Узнали об основных HTTP-методах, разобрались с пагинацией, улучшили читаемость, осознанно используя Lombok-аннотации, инкапсулировали данные с помощью DTO, научились правильно писать эндпоинты и их url'ы, использовали Глобальный обработчик ошибок для методов контроллера, а также при решении всех поставленных задач использовали уместные паттерны проектирования.
Микросервис изображений и предметная область
Бизнес требует возможность прикреплять изображения к инциденту, и перед нами встаёт вопрос, как лучше реализовать эту функциональность. Конечно, можно было бы просто расширить существующий сервис инцидентов, но это не всегда рационально.

Во-первых, инциденты без изображений — довольно частый кейс, и усложнение основного сервиса приведёт к ненужным зависимостям и увеличению нагрузки. Во-вторых, хранение и обработка изображений — это отдельная задача, которая требует специализированных решений, таких как объектное хранилище.
Использование микросервисной архитектуры позволит нам отделить эту логику в независимый сервис, который можно развивать и масштабировать независимо от остальной системы.
Сущность Image

Класс Image представляет собой JPA-сущность, помеченную как @Entity
, отвечающую за хранение информации об изображениях:id инцидента (auto-increment), url в хранилище, метаинформация: тип, размер, имя файла.
Рекомендуется явно использовать @Getter
, @Setter
, @NoArgsConstructor
и @AllArgsConstructor
, а не @Data
, чтобы избежать бесконечной рекурсии в связях и ненужного кода.
Выбор хранилища изображений
Объектное хранилище — это система хранения данных, в которой файлы хранятся в виде объектов внутри контейнеров (bucket), без традиционной файловой иерархии. Каждый объект содержит данные, уникальный идентификатор и метаданные, что позволяет гибко управлять доступом и масштабируемостью. Такой подход абстрагирует хранение данных от файловой системы и обеспечивает гибкость в управлении.
Наиболее распространённый стандарт — S3-совместимые хранилища (Amazon S3, MinIO, Ceph и др.). Для локального развёртывания и тестирования удобнее всего использовать MinIO, так как он полностью совместим с S3 API и прост в настройке. Для больших проектов в продакшен среде используют облачные хранилища от AWS, Google Cloud, Azure и другие, так как поддерживают автоматическое масштабирование, репликацию, доступность и политику безопасности в соответствии со стандартами.
Для знакомства с S3-хранилищем используем MinIO. Это позволит разобраться с принципами работы объектного хранилища в локальной среде. Для этого необходимо:
Добавить зависимость в pom.xml
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>${minio.version}</version>
</dependency>
Развернём MinIO в Docker. Напишем docker-compose.yml, где укажем:
- Образ:
minio/minio:latest
- Имя контейнера:
minio_container
- Credentials для root-пользователя:
MINIO_ROOT_USER
,MINIO_ROOT_PASSWORD
- Порты:
- Основной порт для сервиса:
9000:9000
- Порт веб-интерфейса:
9003:9003
- Основной порт для сервиса:
- Команда для запуска (каталог хранения —
/data
, порт веб-консоли —:9003
) - Том:
minio_data:/data
монтируется в контейнер по пути/data
version: '3.8'
services:
minio:
image: minio/minio:latest
container_name: minio_container
environment:
MINIO_ROOT_USER: <ваш_юзер>
MINIO_ROOT_PASSWORD: <ваш_пароль>
ports:
- "9000:9000"
- "9003:9003"
command: server /data --console-address ":9003"
volumes:
- minio_data:/data
volumes:
minio_data:
Автоматически подтягиваем переменные окружения из application.properties:
@Data
@Component
@ConfigurationProperties(prefix = "minio")
@FieldDefaults(level = AccessLevel.PRIVATE)
public class MinioProperties {
String endpoint;
String accessKey;
String secretKey;
String bucketName;
}
Теперь осталось написать MinIO Config для работы внутри нашего Spring приложения:
@Configuration
@RequiredArgsConstructor
@FieldDefaults(level= AccessLevel.PRIVATE, makeFinal = true)
public class MinioConfig {
MinioProperties minioProperties;
@Bean
public MinioClient minioClient() {
return MinioClient.builder()
.endpoint(minioProperties.getEndpoint())
.credentials(minioProperties.getAccessKey(),
minioProperties.getSecretKey())
.build();
}
@Bean
public String bucketName() {
return minioProperties.getBucketName();
}
}
И всё что остаётся — прописать логику работы с бином minioClient и, непосредственно, сохранение файла в хранилище. Для этого сгенерировали уникальное имя файла:
String uniqueFileName = "image_" + imageToSave.getFileName() + UUID.randomUUID();
Напишем реализацию интерфейса — MinioServiceImpl
. Метод uploadFile
вызывает на MinioClient
метод putObject
, которому передаётся PutObjectArgs.builder()
с параметрами:
bucket(bucketName)
: имя бакета, в который загружается файл (берётся из конфигурации);object(fileName)
: имя объекта в хранилище;stream(file.getInputStream(), file.getSize(), -1)
: поток файла, его размер и-1
как неизвестное значение части потока;contentType(file.getContentType())
: MIME-тип файла.
После успешной загрузки вызывается getPresignedObjectUrl
, который создаёт подписанный URL для скачивания файла, передавая в GetPresignedObjectUrlArgs.builder()
имя бакета и объекта, а также метод Method.GET
. Для удаления аналогичным образом используется метод minioClient.removeObject()
.
@Override
public String uploadFile(MultipartFile file, String fileName) {
try {
minioClient.putObject(
PutObjectArgs.builder()
.bucket(bucketName)
.object(fileName)
.stream(file.getInputStream(), file.getSize(), -1)
.contentType(file.getContentType())
.build()
);
log.info("File {} uploaded successfully to MinIO", fileName);
return minioClient.getPresignedObjectUrl(
GetPresignedObjectUrlArgs.builder()
.bucket(bucketName)
.object(fileName)
.method(Method.GET)
.build()
);
} catch (Exception e) {
throw new MinioFileException("Failed to upload file " + fileName, e);
}
}
@Override
public void deleteFile(String fileName) {
try {
minioClient.removeObject(
RemoveObjectArgs.builder()
.bucket(bucketName)
.object(fileName)
.build()
);
log.info("File {} deleted successfully from MinIO", fileName);
} catch (Exception e) {
throw new MinioFileException("Failed to delete file " + fileName, e);
}
}
Втягиваемся в Apache Kafka

Явилось свету чудо инженерной мысли. Эльф постиг Apache Kafka
Tеоретическая база
Kafka — это высокопроизводительный брокер сообщений, в основе которого лежат три ключевых элемента: продюсеры, консьюмеры и брокеры. Продюсеры записывают события в лог, консьюмеры их читают, а сам кластер Kafka состоит из множества брокеров, объединённых в единую сеть. Эти брокеры работают синхронно, обеспечивая надёжность и масштабируемость системы. Когда мы говорим, что продюсеры отправляют события в Kafka-кластер, на самом деле они взаимодействуют с его брокерами.
Независимо от версии Apache Kafka, важно понимать роль Zookeeper. Так как продюсеры и консьюмеры не взаимодействуют напрямую, поддержание их согласованного состояния ложится на Zookeeper. Он управляет внутренними процессами Kafka, конфигурацией топиков и пользователей, а также задаёт политики доступа. Кроме того, Zookeeper обеспечивает отказоустойчивость системы, помогая кластеру справляться со сбоями.

Вот они слева направо — ключевые составляющие
▍ Топики
Топик — логическое разделение логов на некие тематики/категории сообщений по группам. Например, сообщения рассылок и уведомлений будут выделены в одно группу, а передача изображений — в другую.
Топики делятся на партиции, а партиции на сегменты. Партиции топиков могут находиться в разных брокерах, тем самым обеспечивая масштабируемость. Сегмент — это физическое представление на дисках.
Рекомендую статью к прочтению, если хочется разобраться подробнее.
▍ Режимы работы и гарантии доставки
- At most once (не более одного раза)
- Сообщение может быть потеряно.
- Повторная доставка не производится.
- Применяется крайне редко (в случаях, где потеря данных допустима).
- At least once (как минимум один раз)
- Kafka гарантирует, что сообщение будет доставлено.
- Возможны дубликаты.
- Требует идемпотентной обработки на стороне consumer.
- Используется чаще всего.
- Exactly once (ровно один раз)
- Исключает и потерю, и дублирование сообщений.
- Реализуется через транзакции Kafka.
- Требует специальной настройки как producer'а, так и consumer'а.
- Наиболее надёжный, но и самый сложный по реализации.
Kafka даёт гибкость в управлении надёжностью доставки сообщений, позволяя выбирать подходящий уровень гарантии в зависимости от задачи. Режим at least once
считается «золотой серединой»: он надёжен и относительно прост в реализации — достаточно настроить повторные попытки и включить идемпотентность у продюсера. Однако для критичных операций, где даже дубликат недопустим (например, при переводе денег), применяют exactly once, несмотря на его сложность.В нашем случае реализуем at least once, правильно указав параметры в properties-файле.
Spring Framework && Kafka
⚙️ Конфигурация и зависимости
Разделим взаимодействие по типу обработки:
- Синхронные запросы (сохранение, обновление, получение) → gRPC (HTTP 2.0).
- Асинхронные операции (удаление) → Kafka.
Начнём с конфигурации и зависимостей для работы Spring сервиса. У нас есть выбор из двух библиотек:
<dependencies>
<!-- Классический Spring Kafka (наш выбор) -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Реактивная Kafka (Reactor Kafka) -->
<!-- (например если используется WebFlux или Quarkus framework)-->
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
</dependencies>
Spring Kafka реализация
В Spring Kafka отправка сообщений возможна разными способами. Отправка через KafkaTemplate (высокий уровень абстракции):
private void sendKafkaMessage(DataDto dataDto) {
kafkaTemplate.send(TOPIC_NAME, dataDto);
}
Отправка через ProducerRecord
, где можно указать key, partition, timestamp:
private void sendWithPartition(DataDto dataDto, int partition) {
var producerRecord = new ProducerRecord<>(
TOPIC_NAME, // Топик
partition, // Номер партиции
dataDto.getId(), // Ключ (опционально)
dataDto // Сообщение
);
kafkaTemplate.send(producerRecord);
}
Отправка через MessageBuilder (низкий уровень абстракции), где можно указать полный пакет настроек сообщения (заголовки, партиции, ключи):
private void sendKafkaMessage(DataDto dataDto) {
var message = MessageBuilder.withPayload(dataDto)
.setHeader(KafkaHeaders.TOPIC, TOPIC_NAME)
.setHeader(KafkaHeaders.PARTITION, 0)
.setHeader(KafkaHeaders.KEY, "my-key")
.build();
kafkaTemplate.send(message);
}
Kafka Producer
Начать необходимо с описания параметров нашего продюсера, с его настроек. Для этого впишем в application.properties переменные и подтянем их через @ConfigurationProperties(prefix = «spring.kafka»). На самом деле Spring может делать это автоматически, в случае самых стандартных параметров, но я бы делал свой компонент свойств:
@Data
@Component
@FieldDefaults(level = AccessLevel.PRIVATE)
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaCustomProperties {
String bootstrapServers;
Producer producer;
@Data
@FieldDefaults(level = AccessLevel.PRIVATE)
public static class Producer {
Integer retries;
String acks;
Integer deliveryTimeout;
Integer retryBackoff;
Integer lingerMs;
Boolean enableIdempotence;
}
}
Сами переменные прописываем в application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.retries=3
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.acks=all
spring.kafka.producer.delivery-timeout=120000
spring.kafka.producer.retry-backoff=1000
spring.kafka.producer.linger-ms=50
kafka.delete-image.topic=delete-image-topic
// Эти параметры подбираются исходя из опытной нагрузки и требований к надёжности, и, соответственно, могут меняться по ситуации.
Здесь указали основные параметры продюсера — это:
- Адреса Kafka-брокеров, например:
"localhost:9092"
или"kafka-1:9092,kafka-2:9092"
. - Количество повторных попыток (
retries
). - Подтверждение получения сообщений (
acks
): «0» — не ждать, «1» — ждать от одного брокера, «all» — дождаться подтверждения от всех реплик. - Таймаут доставки в миллисекундах (
delivery.timeout.ms
). - Задержка перед отправкой (
linger.ms
), что позволяет накапливать сообщения перед отправкой (для увеличения пропускной способности). - Повторная отправка при сбоях (
retry.backoff.ms
). - Идемпотентность (
enable.idempotence
): true — гарантия доставки в нашем случае at least once, и, желательно, оставить true, тем самым не будет дублирования в рамках одной партиции.
@Configuration
@RequiredArgsConstructor
@FieldDefaults(level= AccessLevel.PRIVATE, makeFinal = true)
public class KafkaConfig {
KafkaCustomProperties kafkaProperties;
@Bean
public KafkaTemplate<String, DeleteImageRequest> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
private ProducerFactory<String, DeleteImageRequest> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer<>());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducer().getRetries());
props.put(ProducerConfig.ACKS_CONFIG, kafkaProperties.getProducer().getAcks());
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProperties.getProducer().getDeliveryTimeout());
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaProperties.getProducer().getRetryBackoff());
props.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProperties.getProducer().getLingerMs());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProperties.getProducer().getEnableIdempotence());
return props;
}
}
Здесь дополнительно указывали сериализацию ключей и значений с помощью StringSerializer.class
, JsonSerializer.class
.
@Service
@Slf4j
@RequiredArgsConstructor
@FieldDefaults(level= AccessLevel.PRIVATE, makeFinal = true)
public class KafkaImageServiceImpl implements KafkaImageService {
KafkaTemplate<String, DeleteImageRequest> kafkaTemplate;
@NonFinal
@Value("${kafka.delete-image.topic}")
String deleteImageTopic;
@Override
public void deleteImage(DeleteImageRequest deleteImageRequest) {
Message<DeleteImageRequest> message = MessageBuilder
.withPayload(deleteImageRequest)
.setHeader(KafkaHeaders.TOPIC, deleteImageTopic)
.setHeader(KafkaHeaders.KEY, "delete-image-key")
.build();
kafkaTemplate.send(message)
.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Delete image request sent successfully with offset: {}",
result.getRecordMetadata().offset());
} else {
log.error("Failed to send delete image request: {}", ex.getMessage());
}
});
}
}
Собираем Message через MessageBuilder, указывая полезную нагрузку, имя топика, ключ для партиционирования и отправляем через KafkaTemplate, логируя успех или ошибку.
public record DeleteImageRequest (
Long incidentId
){}
В запросе только id инцидента, связанные с которым изображения необходимо удалить.
Kafka Consumer
Свойство | Описание | Значение по умолчанию | Выбранное значение |
---|---|---|---|
bootstrapServers |
Адреса Kafka-брокеров | null | localhost:9092 |
groupId |
Группа consumer’ов, к которой он принадлежит |
null | image-service-group |
autoOffsetReset |
Политика обработки офсетов (earliest, latest, none) |
latest | earliest (читаем сначала) |
keyDeserializer |
Класс для десериализации ключа сообщения | String Serializer | String Serializer |
valueDeserializer |
Класс для десериализации значения сообщения | String Serializer | Json Serializer |
enableAutoCommit |
Включить или отключить автофиксацию офсетов |
true | false (вручную фиксируем офсет, реализуя режим at least once) |
maxPollRecords |
Максимальное количество сообщений за один poll() |
500 | по умолчанию |
concurrency |
Определяет количество потоков в ConcurrentKafkaListenerContainer- Factory |
1 | 3 (Spring Kafka сам создаст потоковые обработчики) |
Offset — это уникальный номер сообщения в партиции Kafka. Каждое сообщение в топике имеет смещение (offset), и консьюмер использует его, чтобы понимать, какие сообщения уже обработаны.
В Кафка есть множество других настроек, но в 99% случаев вышеуказанных достаточно.
@Data
@Component
@FieldDefaults(level = AccessLevel.PRIVATE)
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
String bootstrapServers;
Consumer consumer;
@Data
public static class Consumer {
String groupId;
String autoOffsetReset;
String keyDeserializer;
String valueDeserializer;
Integer maxPollRecords;
Integer concurrency;
}
}
В config классе следует передать свойства и создать бин фабрики с конфигурацией, контейнеры для Kafka слушателей.
@Configuration
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class KafkaConfig {
KafkaProperties kafkaProperties;
DefaultErrorHandler kafkaErrorHandler;
@Bean
public ConsumerFactory<String, DeleteImageRequest> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCustomProperties.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaCustomProperties.getGroupId());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaCustomProperties.getAutoOffsetReset());
// Указываем ErrorHandlingDeserializer как основной десериализатор
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
// Указываем делегаты для ErrorHandlingDeserializer
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
// Настройки JsonDeserializer
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, DeleteImageRequest.class.getName());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaCustomProperties.getMaxPollRecords());
// Убираем ручное создание JsonDeserializer
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DeleteImageRequest> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, DeleteImageRequest> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(kafkaCustomProperties.getConcurrency());
factory.setCommonErrorHandler(kafkaErrorHandler);
return factory;
}
}
Также мы указали сценарий на случай ошибок — kafkaErrorHandler. Если консьюмер не может обработать сообщение, оно попадает в этот обработчик для логирования и отправки в DLT (Dead Letter Topic) — специальный топик для вызывающих ошибки сообщений. Упрощённо handler выглядит так:
@Configuration
public class KafkaErrorHandlerConfig {
@Bean
public DefaultErrorHandler kafkaErrorHandler(KafkaTemplate<String, DeleteImageRequest> kafkaTemplate) {
return new DefaultErrorHandler((record, exception) -> {
String key = record.key() != null ? (String) record.key() : "unknown-key";
DeleteImageRequest value = (DeleteImageRequest) record.value();
kafkaTemplate.send("delete-image-topic-dlt", key, value);
});
}
}
Осталось рассмотреть создание читателя, то есть консьюмера. Напомню, что имплементируем «интерфейс для реализации» по паттерну, определяя в нём, какие методы необходимо реализовать. Упрощённо этот сервис выглядит так:
@Component
@RequiredArgsConstructor
@FieldDefaults(level= AccessLevel.PRIVATE, makeFinal=true)
public class KafkaImageConsumerImpl implements KafkaImageConsumer {
ImageFacade imageFacade;
@Override
@KafkaListener(
topics = "${spring.kafka.consumer.image.topic}",
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeDeleteImageRequest(ConsumerRecord<String, DeleteImageRequest> record, Acknowledgment ack) {
jwtAuthenticationService.authenticateJwt(extractJwtToken(record));
DeleteImageRequest deleteImageRequest = record.value();
processAndDeleteImage(deleteImageRequest);
// в случае ошибки, обработку перехватит DefaultErrorHandler и офсет не будет зафиксирован
// в случае успешной обработки, KafkaContainer автоматически сделать commitSync()
}
private void processAndDeleteImage(DeleteImageRequest deleteImageRequest) {
try {
imageFacade.deleteImagesFromKafkaRequest(deleteImageRequest);
} catch (Exception e) {
throw new FileStorageException("Some error message: " +
deleteImageRequest.incidentId(), e);
}
}
}
Здесь обратим внимание на аннотацию @KafkaListener
, где указываем прослушиваемый топик, группу и фабрику контейнеров, сконфигурированную ранее. Принимаем мы consumerRecord, извлекаем структуру, которая и есть наш запрос на удаление DeleteImageRequest.
Рекомендую использовать Big Data Tools Plugin в IDE и Kafka-ui, позволяющие мониторить отправляемые данные и метаданные сообщения 👍
Синхронное взаимодействие с gRpc

Теоретическая база
gRpc — это современная высокопроизводительная система удалённого вызова процедур (RPC), разработанная в Google. Он предназначен для построения распределённых систем, позволяя клиентам и серверам, взаимодействовать друг с другом посредством определённых методов, как если бы они были локальными.
Это надо знать!
- Использование HTTP/2 — обеспечивает высокую скорость, мультиплексирование запросов и эффективное управление соединениями.
- Протокол сериализации Protocol Buffers (protobuf) — компактный и быстрый бинарный формат передачи данных. Вместо тех же тяжёлых JSON'ов, которые можно получать, обращаясь к REST API. Меньше размер -> быстрее передача данных (в 7–10 раз)
- Автогенерация кода под все популярные языки программирования (Java, Python, C++, Go, etc.)
- Различные типы вызовов:
- Unary Rpc — клиент отправляет один запрос и получает один ответ,
- Server Streaming Rpc — клиент отправляет один запрос, сервер отвечает потоком данных,
- Client Streaming Rpc — клиент отправляет поток данных, сервер отвечает одним ответом,
- Bidirectional Streaming Rpc — двусторонний поток сообщений между клиентом и сервером.
- Безопасность — встроенная TLS.
gRPC идеально подходит для микросервисной архитектуры, высоконагруженных систем и облачных сервисов, обеспечивая быструю и надёжную передачу данных между сервисами.
Рассмотрим механизм RPC-взаимодействия
Remote Procedure Call (RPC) — это механизм, при котором клиент вызывает удалённую процедуру так, как если бы она выполнялась локально, передавая параметры. Однако фактическое выполнение происходит на стороне сервера, а клиент получает результат в упакованном виде, используя предопределённый формат передачи данных.
Разберёмся на примере:

Рассмотрим алгоритм сохранения изображения:
- Клиент (Incident Service) – отправляет запрос на загрузку изображения, передавая его параметры (файл, метаданные и т. д.).
- Client RPC Stub – это сгенерированный gRPC-клиент, который принимает вызов метода у клиента (Incident Service), сериализует данные в protobuf и отправляет gRPC-запрос на сервер (Image Service) через HTTP/2.
- gRPC Server (Image Service) – принимает gRPC-запрос, десериализует данные и вызывает соответствующий обработчик (метод в сервисе Image Service), который выполняет сохранение изображения.
- Обработчик на сервере (Image Service) – выполняет основную бизнес-логику: сохранение, генерацию url и id, формирование ответа.
- Server RPC Stub – сериализует результат (Id и Url) в protobuf и отправляет ответ клиенту.
- Client RPC Stub (на стороне Incident Service) – принимает ответ, десериализует его и передаёт приложению (Incident Service), которое может использовать полученные Id и Url изображения.
Image_service.proto
— общее описание для обоих микросервисов и содержит gRPC-сервис для работы с изображениями. Он определяет контракты взаимодействия между клиентом (Incident Service) и сервером (Image Service). В этом файле указываются сообщения (protobuf) и методы, доступные для вызова через gRPC.
<dependencies>
<!-- gRPC Spring Boot Starter (Для сервера) -->
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-spring-boot-starter</artifactId>
<version>${net.devh.version}</version>
</dependency>
<!-- gRPC Spring Boot Client Starter (Для клиента) -->
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-client-spring-boot-starter</artifactId>
<version>${net.devh.version}</version>
</dependency>
<!-- gRPC Core Dependencies -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<!-- Protobuf Compiler Plugin -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>test-compile</goal>
<goal>test-compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
За генерацию кода, исходя из этого описания, отвечает компилятор protoc. Например, сущность запроса на сохранение и ответ выглядят так:
syntax = "proto3";
package image;
message SaveImageRequest {
int64 incident_id = 1;
bytes file_data = 2;
int64 size = 3;
string type = 4;
}
message SaveImageResponse {
int64 image_id = 1;
string url = 2;
}
Взаимодействия с этими сущностями строится посредством описания в proto-файле структуры, а Java-кодом — сама реализация. Например:
service ImageService {
rpc SaveImage (SaveImageRequest) returns (SaveImageResponse);
}
Сам метод сервиса описывается один раз в нашем случае на микросервисе Изображений (Клиент), который принимает файлы на сохранение и упрощённо выглядит так:
@Service
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public class GrpcImageServiceImpl extends ImageServiceImplBase {
ImageFacade imageFacade;
ImageMapper imageMapper;
@Override
public void saveImage(SaveImageRequest request,
StreamObserver<SaveImageResponse> responseObserver) {
try {
ImageDto imageDto = imageMapper.toImageDto(request);
Image savedImage = imageFacade.saveImage(imageDto, file);
SaveImageResponse response = imageMapper.toSaveImageResponse(savedImage);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
responseObserver.onError(
new RuntimeException("Ошибка при сохранении изображения: " +
+ e.getMessage(), e));
}
}
}
Сервис наследуется от сгенерированного ImageServiceImplBase
и здесь мы переопределяем (@Override
) gRpc-метод. Он принимает request сериализованный protobuf’ом, маппит его в ImageDto
, сохраняет и формирует ответ.
StreamObserver — интерфейс для передачи данных в ответ на запрос. onNext()
— отправка, onCompleted()
— завершает поток, onError()
— сообщает об ошибке.
Осталось посмотреть на сам вызов удалённой процедуры и упрощённо выглядит так:
@Service
@RequiredArgsConstructor
@FieldDefaults(level = PRIVATE, makeFinal = true)
public class GrpcClientServiceImpl implements GrpcClientService{
ImageServiceBlockingStub imageServiceStub;
UserServiceBlockingStub userServiceStub;
ImageRequestMapper imageRequestMapper;
@Override
public SaveImageResponse uploadImages(List<ImageDto> imageDtoList, Incident incident) {
for (ImageDto imageDto : imageDtoList) {
try {
SaveImageRequest request = imageRequestMapper.toSaveImageRequest(imageDto);
SaveImageResponse response = imageServiceStub.saveImage(request);
return response;
} catch (StatusRuntimeException e) {
throw new ImageProcessingException("Some error message: "
+ incident.getId(), e);
} catch (Exception e) {
throw new ImageProcessingException("Some error message: "
+ incident.getId() + " with unexpected error: ", e);
}
}
}
}
Сервис наследуется от GrpcClientService (паттерн Interface for Implementation) и реализует метод загрузки изображений на сервер. Здесь мы маппим Dto в Request, вызываем блокирующий метод сохранения, так как нам важно зафиксировать успех выполнения операции, и возвращаем ответ.
Результаты
Запрос с обязательными параметрами в postman:

POST-запрос на создание инцидента
По запросу на Incident api создаётся запись в БД об инциденте, а изображение передаётся по gRpc на Image-service, который пишет в minio и в ui мы видим такую картину:

В результате в minio — 1 объект
Выполнив запрос на удаление с указанием id инцидента:

Запрос на удаление инцидента
Incident-service удаляет информацию об инциденте и асинхронно запрашивает удаление у Image-service, записывая в топик Kafka сообщение. Как и предполагалось в delete-image-topic сообщение с id инцидента, связанные с которым изображения необходимо удалить, что можно увидеть в kafka-ui:

Сообщение с id инцидента
Полный код с учётом всех текущих и будущих функциональностей, документаций и инструкцией к локальному развёртыванию доступен на моём GitHub. Буду рад, если вам понравится проект, и не стесняйтесь ставить звёздочки!
Заключение
В этой статье мы реализовали микросервисное взаимодействие с помощью gRpc и Kafka. Рассмотрели создание микросервиса изображений, его функциональность и архитектуру, научились работать с S3 хранилищами, на примере Minio, реализовали синхронное взаимодействие по gRpc и асинхронное с помощью Kafka.
Мир Java-разработки — это бесконечное поле для экспериментов, творчества и обучения. Надеюсь, вы нашли эту статью полезной и вдохновляющей. Если у вас есть вопросы, замечания или идеи — делитесь ими в комментариях, буду искренне рад конструктивной критике, поскольку вместе мы сделаем этот путь ещё более интересным и продуктивным.
Присоединяйтесь к моему Telegram-каналу. До встречи в следующей статье! 🚀
© 2025 ООО «МТ ФИНАНС»
Telegram-канал со скидками, розыгрышами призов и новостями IT 💻
