Введение

Рад всех приветствовать!

В данной статье мы поработаем с технологиями Apache Avro и Schema Registry на практике. Будем использовать нашу платформу, которая эволюционирует с каждой практической статьёй. Меняли мы эту платформу крайний раз в данной статье.

Приятного чтения!

Модифицируем ямлик

Для начала давайте добавим в наш docker-compose.yaml новый сервис — Schema Registry.

Допишем в него следующее:

  schema-registry:
    image: confluentinc/cp-schema-registry:8.1.0
    container_name: schema-registry
    hostname: schema-registry
    depends_on:
      - kafka-0
      - kafka-1
      - kafka-2
    ports:
      - "8087:8081"
    environment:
      # Адреса нод кластера
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka-0:9091,kafka-1:9091,kafka-2:9091
      # Имя хоста (об этой переменной окружения мы подробнее поговорим ниже)
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      # Слушаем на 8081 порту на всех сетевых интерфейсах
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

Также нужно дописать некоторую информацию в Kafka-UI:

  kafka-ui:
    image: provectuslabs/kafka-ui:v0.7.2
    container_name: kafka-ui
    hostname: kafka-ui
    depends_on:
      - kafka-0
      - kafka-1
      - kafka-2
      - schema-registry # Теперь есть зависимость от реестра
    ports:
      - "8086:8080"
    environment:
      # Название кластера в веб-интерфейсе
      KAFKA_CLUSTERS_0_NAME: local
      # Указываем адреса наших брокеров в кластере
      # так как Kafka-UI и брокеры в одной Docker-сети, то используем порты 9091
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9091,kafka-1:9091,kafka-2:9091
      # Настройка для динамической конфигурации брокеров через UI
      DYNAMIC_CONFIG_ENABLED: true
      # Указываем адрес ресстра для того, чтобы Kafka-UI его подхватила
      # Это нужно для красивого отображения сообщений, ведь для того, чтобы
      # Получить схему, нам необходимо перейти в реестр
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081

Как Schema Registry связана с кластером Kafka?

Сейчас обсудим то, как Schema Registry связана с Kafka-кластером. Также обсудим, что такое кластер сервисов Schema Registry. Это всё будет тесно связано с переменной окружения, которую я обещал рассмотреть подробнее.

Schema Registry хранит схемы не внутри себя, а в специальном топике _schemas. То есть кластер Kafka используется как база данных.

Также важно сказать о том, что часто поднимают не один реестр, а несколько (кластер).

Работает этот кластер по модели взаимодействия Master-Slave. Это значит, что в конкретный момент времени есть одна главная нода, а все остальные — служебные.

Про модель взаимодействия Master-Slave в контексте Schema Registry

Запись происходит всегда только через главную ноду для того, чтобы избежать проблемы, известной как Race Condition. Сами посудите:

  1. Клиент A кидает схему на одну ноду

  2. Клиент Б кидает схему на другую ноду

Если это происходит параллельно, то:

  1. Первая нода видит, что уже есть версия 1 и сохраняет версию 2

  2. Вторая нода также видит, что есть версия 1 и сохраняет версию 2

Следовательно, у нас лежат две версии 2 в топике. Так быть не должно.

Можно, конечно, предположить сделать синхронизацию, но в данном контексте её делать крайне сложно, да и работа будет медленной.

Чтение же схем может происходить со Slave-нод для того, чтобы снизить нагрузку на главную ноду.

Про переменную окружения

Давайте вспомним следующую переменную окружения:

SCHEMA_REGISTRY_HOST_NAME: schema-registry

Казалось бы, зачем её указывать, если в Docker Compose мы уже указывали hostname?

На самом деле мы указываем эту переменную для процесса выбора лидера (Master-ноды).

Выбор лидера происходит по тому же механизму, что и в самом Kafka-кластере. То есть все ноды реестра имеют один group.id. Процесс выбора лидера мы обсуждали в самой первой статье цикла, здесь повторяться я не вижу смысла.

Скажу только лишь то, что выбранная нода делает семантически примерно следующую запись в топик _schemas: Я — Мастер. Мой адрес: http://schema-registry:8081

schema-registry в данном URL — это как раз hostname, который возьмётся из переменной окружения.

Следовательно, если запрос на запись попадёт на Slave-ноду кластера, то она перенаправит запрос по указанному URL к Master-ноде.

Начинаем работу с Avro-схемами (order-service)

В прошлой статье мы достаточно подробно рассмотрели Avro-схемы. Давайте теперь поработаем с ними на практике.

Предлагаю открыть нашу с вами платформу.

Пишем Avro-схему

Начнём с нашего order-service. У нас на данный момент в сервисе всё завязано на JSON. Давайте начнём переводить его на Avro.

Для начала создадим папку avro по пути /src/main/avro внутри сервиса.

Далее создадим файл OrderPlacedEvent.avsc со следующим содержанием:

{
  "type": "record",
  "name": "OrderPlacedEvent",
  "namespace": "io.mitochondria.order.event",
  "fields": [
    { "name": "orderId", "type": "string" },
    { "name": "email", "type": "string" },
    { "name": "productName", "type": "string" },
    { "name": "quantity", "type": "int" }
  ]
}

Это по сути представление нашего OrderRequest из пакета dto в виде Avro-схемы.

Также отмечу, что далее мы будем генерировать класс из данной Avro-схемы. При генерации работает правило namespace = java package. То есть сгенерированный класс будет размещён относительно корня проекта по пути, указанном в поле namespace.

В нашей схеме я указал пакет, в котором уже есть класс с таким же именем. Так что удалите его заранее, чтобы не возникло проблем при генерации.

Да, в нашем dto поле quantity имело тип Integer, то есть оно могло быть null. Однако по бизнес-логике заказ без количества — это глупость. Поэтому в Avro-схеме делаем тип не как union (int + null), а как int.

Специальный сериализатор

Теперь давайте настроим специальный сериализатор, о котором мы говорили в теоретической статье. Чтобы его получить, нам нужна специальная зависимость.

Для этого сначала добавьте в pom.xml оффициальный репозиторий Confluent. Это делается вставкой следующего фрагмента:

<repositories>
    <repository>
        <id>confluent</id>
        <name>Confluent Maven Repository</name>
        <url>https://packages.confluent.io/maven/</url>
    </repository>
</repositories>

Делаем мы это по причине того, что нужная нам зависимость находится именно в этом репозитории.

Далее добавим саму зависимость:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>8.2.0</version>
    <scope>compile</scope>
</dependency>

Зависимость добавлена. Теперь поменяем сериализатор в application.properties файле:

spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer

Ключ мы по-прежнему будем сериализовать как строку, а значение уже по протоколу Confluent Avro.

Знакомство Avro-сериализатора со Schema Registry

Также нужно познакомить наш Avro-сериализатор со Shema Registry. Для этого добавим следующее свойство:

spring.kafka.producer.properties.schema.registry.url=http://localhost:8087

Генерация классов из Avro-схем

Давайте сгенерируем специальный класс из Avro-схемы. Для этого сначала добавим специальный плагин:

<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <executions>
        <execution>
            <id>schemas</id>
            <phase>generate-sources</phase>
            <goals>
                <goal>schema</goal>
            </goals>
            <configuration>
                <sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory>
                <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
            </configuration>
        </execution>
    </executions>
</plugin>

Теперь в терминале перейдём в корневую папку order-service и запустим команду mvn clean compile.

Небольшой рефакторинг

Класс сгенерирован. Теперь немного поменяем код сервиса. Сделаем это из-за того, что ранее мы использовали старый класс OrderPlacedEvent, который заменили сгенерированным. От нас требуется просто поменять на 27 строчке вызов метода send на следующее:

kafkaTemplate.send("order-placed", orderPlacedEvent.getOrderId().toString(), orderPlacedEvent);

В целом это всё. Наш order-service готов.

Модифицируем inventory-service

Теперь поработаем с inventory-service.

Для начала создадим директорию avro по тому же пути и скопируем Avro-схему из order-service, потому что для данного сервиса также необходим OrderPlacedEvent.

Ещё добавим 2 Avro-схемы, описывающие InventoryRejectedEvent и InventoryReservedEvent:

{
  "type": "record",
  "name": "InventoryRejectedEvent",
  "namespace": "io.mitochondria.inventory.event",
  "fields": [
    { "name": "orderId", "type": "string" },
    { "name": "email", "type": "string" }
  ]
}
{
  "type": "record",
  "name": "InventoryReservedEvent",
  "namespace": "io.mitochondria.inventory.event",
  "fields": [
    { "name": "orderId", "type": "string" },
    { "name": "email", "type": "string" }
  ]
}

Далее аналогично поставим зависимость для того, чтобы получить сериализатор и десериализатор. Повторяться не буду, ибо выше показывал, как это делать.

Поскольку наш inventory-service не только продюсер, а ещё и консьюмер, то свойств в application.properties файле будет добавлено и изменено чуть больше.

Для начала поменяем сериализатор и десериализатор значения сообщения:

spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer

Теперь так же, как и в order-service укажем URL для Schema Registry. При этом нужно указать как для нативного продюсера, так и для нативного консьюмера:

spring.kafka.producer.properties.schema.registry.url=http://localhost:8087
spring.kafka.consumer.properties.schema.registry.url=http://localhost:8087

Также нам будет необходимо указать следующее свойство:

spring.kafka.consumer.properties.specific.avro.reader=true

Эта настройка позволяет использовать сгенерированные классы для десериализации. Если не переопределить на true, то десериализатор будет выдавать GenericRecord вместо конкретного типа. Это некоторый общий класс, с которым не так удобно работать, как с конкретным.

Далее удалим наши event-классы и сгенерируем новые. Не забудьте добавить специальный плагин.

Также поправьте в коде всё таким образом, чтобы не было ошибок компиляции из-за использования новых классов.

Также, поскольку мы отказываемся от JSON, удалим следующее свойство:

spring.kafka.consumer.properties.spring.json.trusted.packages=io.mitochondria.order.event

Далее нужно внести некоторые изменения в наш сервисный класс, где мы работаем с outbox таблицей. Сперва нам понадобится создать 2 DTO в соответствующем пакете:

public record InventoryRejectedDto(String orderId, String email) {}
public record InventoryReservedDto(String orderId, String email) {}

Теперь поменяем код нашего основного метода сервисного слоя для того, чтобы не смешивать логику и не маппить сгенерированный Avro-класс в JSON. Для этого у нас будут специальные DTO, которые мы как раз будем маппить в JSON и отправлять в базу данных:

private void processOrderInTransaction(OrderPlacedEvent orderPlacedEvent) {
        String orderId = orderPlacedEvent.getOrderId().toString();
        String email = orderPlacedEvent.getEmail().toString();
        String productName = orderPlacedEvent.getProductName().toString();
        int quantity = orderPlacedEvent.getQuantity();

        try {
            processedOrderIdRepository.save(new ProcessedOrderId(
                orderId
            ));
        } catch (DataIntegrityViolationException e) {
            logger.info("Order {} already processed", orderId);
            return;
        }

        int count = inventoryRepository.deductStock(productName, quantity);
        String topic = (count > 0) ? "inventory-reserved" : "inventory-rejected";
        Object dto = (count > 0)
            ? new InventoryReservedDto(orderId, email)
            : new InventoryRejectedDto(orderId, email);
        String json;

        try {
            json = objectMapper.writeValueAsString(dto);
        } catch (Exception e) {
            throw new RuntimeException("Serialization failed for order: " + orderId, e);
        }

        OutboxEvent outboxEvent = new OutboxEvent(
            orderId,
            topic,
            json
        );

        outboxEventRepository.save(outboxEvent);
    }

Такс. Далее изменим наш класс из пакета scheduler следующим образом:

@Scheduled(fixedDelay = 5000)
public void relayOutboxEvents() {
    List<OutboxEvent> outboxEvents = outboxEventRepository.findBySentFalseOrderByCreatedAtAsc(PageRequest.of(0, 100));

    for (OutboxEvent outboxEvent : outboxEvents) {
        try {
            String topic = outboxEvent.getTopic();
            String payloadJson = outboxEvent.getPayload();
            Object payload = null;

            switch (topic) {
                case "inventory-reserved" -> {
                    InventoryReservedDto inventoryReservedDto = objectMapper.readValue(payloadJson, InventoryReservedDto.class);
                    payload = new InventoryReservedEvent(inventoryReservedDto.orderId(), inventoryReservedDto.email());
                }
                case "inventory-rejected" -> {
                    InventoryRejectedDto inventoryRejectedDto = objectMapper.readValue(payloadJson, InventoryRejectedDto.class);
                    payload = new InventoryRejectedEvent(inventoryRejectedDto.orderId(), inventoryRejectedDto.email());
                }
            }

            kafkaTemplate.send(outboxEvent.getTopic(), outboxEvent.getKey(), payload).get();
            outboxEvent.setSent(true);
            outboxEventRepository.save(outboxEvent);
        }
        catch (Exception e) {
            logger.error("Failed to relay outbox event: {}", outboxEvent.getKey(), e);
        }
    }
}

Всё. inventory-service готов. Поздравляю!

Остальные сервисы

Предлагаю вам попрактиковаться и перевести остальные сервисы на Avro самостоятельно. Всё необходимое для этого вы уже знаете, это не особо сложно, да и даст вам немного набить руку. Если что, репозиторий с кодом, как всегда разместил в конце статьи. Так что сможете свериться / подсмотреть, если вдруг что.

Немного углубляемся

GenericRecord vs SpecificRecord

Помните следующее свойство?

spring.kafka.consumer.properties.specific.avro.reader=true

Я говорил, что оно нужно для того, чтобы десериализовывать сразу в конкретные классы. Так вот, SpecificRecord — это и есть сгенерированные классы.

Если не указать это свойство или явно указать false, то при десериализции будет получен GenericRecord. Это, как я говорил ранее, общий тип.

GenericRecord не очень удобен, так как отсутствует типобезопасность и есть возможность получить ошибки в runtime.

Например, в GenericRecord достать поле orderId нужно следующим образом:

String orderId = record.get("orderId").toString();

Как вы видите, мы вручную указываем имя поля. Если ошибиться на одну букву / в регистре, то ошибку в runtime не миновать.

Кастомные сериализаторы / десериализаторы

Об этом меня попросили рассказать в комментариях под одной из прошлых статей.

Вообще, писать кастомные сериализаторы и десериализаторы не нужно в 95% случаев. Но для общего развития, в целом, почитать полезно.

Это редко имеет смысл из-за того, что приходится самим заботиться о формате сообщений, эволюции и совместимости схем. Да и Confluent сделали всё достаточно добротно.

Кастомный сериализатор, например, может выглядеть следующим образом:

public class AvroSerializer implements Serializer<SpecificRecordBase> {
    @Override
    public byte[] serialize(String topic, SpecificRecordBase data) {
        if (data == null) return null;

        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
            var writer = new SpecificDatumWriter<SpecificRecordBase>(data.getSchema());
            var encoder = EncoderFactory.get().binaryEncoder(out, null);

            writer.write(data, encoder);
            encoder.flush();

            return out.toByteArray();
        } 
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

Давайте поясню по шагам, что здесь происходит:

  1. Имплементируемся от Serializer<SpecificRecordBase>, где SpecificRecordBase — это базовый класс, от которого наследуются все SpecificRecord.

  2. Открываем буфер в памяти, куда будем складывать бинарные данные.

  3. Создаём специальный объект, которому в конструктор необходимо передать схему, которая есть у каждого SpecificRecord прямо внутри него. Этот объект знает, как записывать бинарные данные (читает схему, определяет порядок значений, типы).

  4. Создаём объект, который будет записывать бинарные данные в наш бинарный поток.

  5. Записываем бинарные данные в поток

  6. Флашим всё, что осталось в буфере нашего encoder для того, чтобы всё гарантированно сбросилось в поток.

  7. Возвращаем последовательность байтов.

При желании вы можете расширить функциональность сериализатора так, как вам угодно. Но, опять же, кастомные сериализаторы / десериализаторы делают довольно редко.

Заключение

Поздравляю вас! Теперь вы знаете Confluent Avro не только в теории, но и на практике. Уверен, эти знания будут для вас полезными.

Код получившейся платформы можете найти у меня на GitHub. Оставляю ссылку.

Настоятельно вам рекомендую поднять всю инфраструктуру и поиграться с ней: отправить запрос к нашей платформе, зайти в Kafka-UI и полазить в интерфейсе.

В следующий раз рассмотрим два способа координации Kafka-кластера: Zookeeper и KRaft.

Всего наилучшего!