Введение
Рад всех приветствовать!
В данной статье мы поработаем с технологиями Apache Avro и Schema Registry на практике. Будем использовать нашу платформу, которая эволюционирует с каждой практической статьёй. Меняли мы эту платформу крайний раз в данной статье.
Приятного чтения!
Модифицируем ямлик
Для начала давайте добавим в наш docker-compose.yaml новый сервис — Schema Registry.
Допишем в него следующее:
schema-registry: image: confluentinc/cp-schema-registry:8.1.0 container_name: schema-registry hostname: schema-registry depends_on: - kafka-0 - kafka-1 - kafka-2 ports: - "8087:8081" environment: # Адреса нод кластера SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-0:9091,kafka-1:9091,kafka-2:9091 # Имя хоста (об этой переменной окружения мы подробнее поговорим ниже) SCHEMA_REGISTRY_HOST_NAME: schema-registry # Слушаем на 8081 порту на всех сетевых интерфейсах SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
Также нужно дописать некоторую информацию в Kafka-UI:
kafka-ui: image: provectuslabs/kafka-ui:v0.7.2 container_name: kafka-ui hostname: kafka-ui depends_on: - kafka-0 - kafka-1 - kafka-2 - schema-registry # Теперь есть зависимость от реестра ports: - "8086:8080" environment: # Название кластера в веб-интерфейсе KAFKA_CLUSTERS_0_NAME: local # Указываем адреса наших брокеров в кластере # так как Kafka-UI и брокеры в одной Docker-сети, то используем порты 9091 KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9091,kafka-1:9091,kafka-2:9091 # Настройка для динамической конфигурации брокеров через UI DYNAMIC_CONFIG_ENABLED: true # Указываем адрес ресстра для того, чтобы Kafka-UI его подхватила # Это нужно для красивого отображения сообщений, ведь для того, чтобы # Получить схему, нам необходимо перейти в реестр KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
Как Schema Registry связана с кластером Kafka?
Сейчас обсудим то, как Schema Registry связана с Kafka-кластером. Также обсудим, что такое кластер сервисов Schema Registry. Это всё будет тесно связано с переменной окружения, которую я обещал рассмотреть подробнее.
Schema Registry хранит схемы не внутри себя, а в специальном топике _schemas. То есть кластер Kafka используется как база данных.
Также важно сказать о том, что часто поднимают не один реестр, а несколько (кластер).
Работает этот кластер по модели взаимодействия Master-Slave. Это значит, что в конкретный момент времени есть одна главная нода, а все остальные — служебные.
Про модель взаимодействия Master-Slave в контексте Schema Registry
Запись происходит всегда только через главную ноду для того, чтобы избежать проблемы, известной как Race Condition. Сами посудите:
Клиент A кидает схему на одну ноду
Клиент Б кидает схему на другую ноду
Если это происходит параллельно, то:
Первая нода видит, что уже есть версия 1 и сохраняет версию 2
Вторая нода также видит, что есть версия 1 и сохраняет версию 2
Следовательно, у нас лежат две версии 2 в топике. Так быть не должно.
Можно, конечно, предположить сделать синхронизацию, но в данном контексте её делать крайне сложно, да и работа будет медленной.
Чтение же схем может происходить со Slave-нод для того, чтобы снизить нагрузку на главную ноду.
Про переменную окружения
Давайте вспомним следующую переменную окружения:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
Казалось бы, зачем её указывать, если в Docker Compose мы уже указывали hostname?
На самом деле мы указываем эту переменную для процесса выбора лидера (Master-ноды).
Выбор лидера происходит по тому же механизму, что и в самом Kafka-кластере. То есть все ноды реестра имеют один group.id. Процесс выбора лидера мы обсуждали в самой первой статье цикла, здесь повторяться я не вижу смысла.
Скажу только лишь то, что выбранная нода делает семантически примерно следующую запись в топик _schemas: Я — Мастер. Мой адрес: http://schema-registry:8081
schema-registry в данном URL — это как раз hostname, который возьмётся из переменной окружения.
Следовательно, если запрос на запись попадёт на Slave-ноду кластера, то она перенаправит запрос по указанному URL к Master-ноде.
Начинаем работу с Avro-схемами (order-service)
В прошлой статье мы достаточно подробно рассмотрели Avro-схемы. Давайте теперь поработаем с ними на практике.
Предлагаю открыть нашу с вами платформу.
Пишем Avro-схему
Начнём с нашего order-service. У нас на данный момент в сервисе всё завязано на JSON. Давайте начнём переводить его на Avro.
Для начала создадим папку avro по пути /src/main/avro внутри сервиса.
Далее создадим файл OrderPlacedEvent.avsc со следующим содержанием:
{ "type": "record", "name": "OrderPlacedEvent", "namespace": "io.mitochondria.order.event", "fields": [ { "name": "orderId", "type": "string" }, { "name": "email", "type": "string" }, { "name": "productName", "type": "string" }, { "name": "quantity", "type": "int" } ] }
Это по сути представление нашего OrderRequest из пакета dto в виде Avro-схемы.
Также отмечу, что далее мы будем генерировать класс из данной Avro-схемы. При генерации работает правило namespace = java package. То есть сгенерированный класс будет размещён относительно корня проекта по пути, указанном в поле namespace.
В нашей схеме я указал пакет, в котором уже есть класс с таким же именем. Так что удалите его заранее, чтобы не возникло проблем при генерации.
Да, в нашем dto поле quantity имело тип Integer, то есть оно могло быть null. Однако по бизнес-логике заказ без количества — это глупость. Поэтому в Avro-схеме делаем тип не как union (int + null), а как int.
Специальный сериализатор
Теперь давайте настроим специальный сериализатор, о котором мы говорили в теоретической статье. Чтобы его получить, нам нужна специальная зависимость.
Для этого сначала добавьте в pom.xml оффициальный репозиторий Confluent. Это делается вставкой следующего фрагмента:
<repositories> <repository> <id>confluent</id> <name>Confluent Maven Repository</name> <url>https://packages.confluent.io/maven/</url> </repository> </repositories>
Делаем мы это по причине того, что нужная нам зависимость находится именно в этом репозитории.
Далее добавим саму зависимость:
<dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>8.2.0</version> <scope>compile</scope> </dependency>
Зависимость добавлена. Теперь поменяем сериализатор в application.properties файле:
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
Ключ мы по-прежнему будем сериализовать как строку, а значение уже по протоколу Confluent Avro.
Знакомство Avro-сериализатора со Schema Registry
Также нужно познакомить наш Avro-сериализатор со Shema Registry. Для этого добавим следующее свойство:
spring.kafka.producer.properties.schema.registry.url=http://localhost:8087
Генерация классов из Avro-схем
Давайте сгенерируем специальный класс из Avro-схемы. Для этого сначала добавим специальный плагин:
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <executions> <execution> <id>schemas</id> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java</outputDirectory> </configuration> </execution> </executions> </plugin>
Теперь в терминале перейдём в корневую папку order-service и запустим команду mvn clean compile.
Небольшой рефакторинг
Класс сгенерирован. Теперь немного поменяем код сервиса. Сделаем это из-за того, что ранее мы использовали старый класс OrderPlacedEvent, который заменили сгенерированным. От нас требуется просто поменять на 27 строчке вызов метода send на следующее:
kafkaTemplate.send("order-placed", orderPlacedEvent.getOrderId().toString(), orderPlacedEvent);
В целом это всё. Наш order-service готов.
Модифицируем inventory-service
Теперь поработаем с inventory-service.
Для начала создадим директорию avro по тому же пути и скопируем Avro-схему из order-service, потому что для данного сервиса также необходим OrderPlacedEvent.
Ещё добавим 2 Avro-схемы, описывающие InventoryRejectedEvent и InventoryReservedEvent:
{ "type": "record", "name": "InventoryRejectedEvent", "namespace": "io.mitochondria.inventory.event", "fields": [ { "name": "orderId", "type": "string" }, { "name": "email", "type": "string" } ] }
{ "type": "record", "name": "InventoryReservedEvent", "namespace": "io.mitochondria.inventory.event", "fields": [ { "name": "orderId", "type": "string" }, { "name": "email", "type": "string" } ] }
Далее аналогично поставим зависимость для того, чтобы получить сериализатор и десериализатор. Повторяться не буду, ибо выше показывал, как это делать.
Поскольку наш inventory-service не только продюсер, а ещё и консьюмер, то свойств в application.properties файле будет добавлено и изменено чуть больше.
Для начала поменяем сериализатор и десериализатор значения сообщения:
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
Теперь так же, как и в order-service укажем URL для Schema Registry. При этом нужно указать как для нативного продюсера, так и для нативного консьюмера:
spring.kafka.producer.properties.schema.registry.url=http://localhost:8087 spring.kafka.consumer.properties.schema.registry.url=http://localhost:8087
Также нам будет необходимо указать следующее свойство:
spring.kafka.consumer.properties.specific.avro.reader=true
Эта настройка позволяет использовать сгенерированные классы для десериализации. Если не переопределить на true, то десериализатор будет выдавать GenericRecord вместо конкретного типа. Это некоторый общий класс, с которым не так удобно работать, как с конкретным.
Далее удалим наши event-классы и сгенерируем новые. Не забудьте добавить специальный плагин.
Также поправьте в коде всё таким образом, чтобы не было ошибок компиляции из-за использования новых классов.
Также, поскольку мы отказываемся от JSON, удалим следующее свойство:
spring.kafka.consumer.properties.spring.json.trusted.packages=io.mitochondria.order.event
Далее нужно внести некоторые изменения в наш сервисный класс, где мы работаем с outbox таблицей. Сперва нам понадобится создать 2 DTO в соответствующем пакете:
public record InventoryRejectedDto(String orderId, String email) {}
public record InventoryReservedDto(String orderId, String email) {}
Теперь поменяем код нашего основного метода сервисного слоя для того, чтобы не смешивать логику и не маппить сгенерированный Avro-класс в JSON. Для этого у нас будут специальные DTO, которые мы как раз будем маппить в JSON и отправлять в базу данных:
private void processOrderInTransaction(OrderPlacedEvent orderPlacedEvent) { String orderId = orderPlacedEvent.getOrderId().toString(); String email = orderPlacedEvent.getEmail().toString(); String productName = orderPlacedEvent.getProductName().toString(); int quantity = orderPlacedEvent.getQuantity(); try { processedOrderIdRepository.save(new ProcessedOrderId( orderId )); } catch (DataIntegrityViolationException e) { logger.info("Order {} already processed", orderId); return; } int count = inventoryRepository.deductStock(productName, quantity); String topic = (count > 0) ? "inventory-reserved" : "inventory-rejected"; Object dto = (count > 0) ? new InventoryReservedDto(orderId, email) : new InventoryRejectedDto(orderId, email); String json; try { json = objectMapper.writeValueAsString(dto); } catch (Exception e) { throw new RuntimeException("Serialization failed for order: " + orderId, e); } OutboxEvent outboxEvent = new OutboxEvent( orderId, topic, json ); outboxEventRepository.save(outboxEvent); }
Такс. Далее изменим наш класс из пакета scheduler следующим образом:
@Scheduled(fixedDelay = 5000) public void relayOutboxEvents() { List<OutboxEvent> outboxEvents = outboxEventRepository.findBySentFalseOrderByCreatedAtAsc(PageRequest.of(0, 100)); for (OutboxEvent outboxEvent : outboxEvents) { try { String topic = outboxEvent.getTopic(); String payloadJson = outboxEvent.getPayload(); Object payload = null; switch (topic) { case "inventory-reserved" -> { InventoryReservedDto inventoryReservedDto = objectMapper.readValue(payloadJson, InventoryReservedDto.class); payload = new InventoryReservedEvent(inventoryReservedDto.orderId(), inventoryReservedDto.email()); } case "inventory-rejected" -> { InventoryRejectedDto inventoryRejectedDto = objectMapper.readValue(payloadJson, InventoryRejectedDto.class); payload = new InventoryRejectedEvent(inventoryRejectedDto.orderId(), inventoryRejectedDto.email()); } } kafkaTemplate.send(outboxEvent.getTopic(), outboxEvent.getKey(), payload).get(); outboxEvent.setSent(true); outboxEventRepository.save(outboxEvent); } catch (Exception e) { logger.error("Failed to relay outbox event: {}", outboxEvent.getKey(), e); } } }
Всё. inventory-service готов. Поздравляю!
Остальные сервисы
Предлагаю вам попрактиковаться и перевести остальные сервисы на Avro самостоятельно. Всё необходимое для этого вы уже знаете, это не особо сложно, да и даст вам немного набить руку. Если что, репозиторий с кодом, как всегда разместил в конце статьи. Так что сможете свериться / подсмотреть, если вдруг что.
Немного углубляемся
GenericRecord vs SpecificRecord
Помните следующее свойство?
spring.kafka.consumer.properties.specific.avro.reader=true
Я говорил, что оно нужно для того, чтобы десериализовывать сразу в конкретные классы. Так вот, SpecificRecord — это и есть сгенерированные классы.
Если не указать это свойство или явно указать false, то при десериализции будет получен GenericRecord. Это, как я говорил ранее, общий тип.
GenericRecord не очень удобен, так как отсутствует типобезопасность и есть возможность получить ошибки в runtime.
Например, в GenericRecord достать поле orderId нужно следующим образом:
String orderId = record.get("orderId").toString();
Как вы видите, мы вручную указываем имя поля. Если ошибиться на одну букву / в регистре, то ошибку в runtime не миновать.
Кастомные сериализаторы / десериализаторы
Об этом меня попросили рассказать в комментариях под одной из прошлых статей.
Вообще, писать кастомные сериализаторы и десериализаторы не нужно в 95% случаев. Но для общего развития, в целом, почитать полезно.
Это редко имеет смысл из-за того, что приходится самим заботиться о формате сообщений, эволюции и совместимости схем. Да и Confluent сделали всё достаточно добротно.
Кастомный сериализатор, например, может выглядеть следующим образом:
public class AvroSerializer implements Serializer<SpecificRecordBase> { @Override public byte[] serialize(String topic, SpecificRecordBase data) { if (data == null) return null; try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { var writer = new SpecificDatumWriter<SpecificRecordBase>(data.getSchema()); var encoder = EncoderFactory.get().binaryEncoder(out, null); writer.write(data, encoder); encoder.flush(); return out.toByteArray(); } catch (IOException e) { throw new RuntimeException(e); } } }
Давайте поясню по шагам, что здесь происходит:
Имплементируемся от
Serializer<SpecificRecordBase>, гдеSpecificRecordBase— это базовый класс, от которого наследуются всеSpecificRecord.Открываем буфер в памяти, куда будем складывать бинарные данные.
Создаём специальный объект, которому в конструктор необходимо передать схему, которая есть у каждого
SpecificRecordпрямо внутри него. Этот объект знает, как записывать бинарные данные (читает схему, определяет порядок значений, типы).Создаём объект, который будет записывать бинарные данные в наш бинарный поток.
Записываем бинарные данные в поток
Флашим всё, что осталось в буфере нашего encoder для того, чтобы всё гарантированно сбросилось в поток.
Возвращаем последовательность байтов.
При желании вы можете расширить функциональность сериализатора так, как вам угодно. Но, опять же, кастомные сериализаторы / десериализаторы делают довольно редко.
Заключение
Поздравляю вас! Теперь вы знаете Confluent Avro не только в теории, но и на практике. Уверен, эти знания будут для вас полезными.
Код получившейся платформы можете найти у меня на GitHub. Оставляю ссылку.
Настоятельно вам рекомендую поднять всю инфраструктуру и поиграться с ней: отправить запрос к нашей платформе, зайти в Kafka-UI и полазить в интерфейсе.
В следующий раз рассмотрим два способа координации Kafka-кластера: Zookeeper и KRaft.
Всего наилучшего!
