
Привет, Хабр! Меня зовут Владислав, я занимаюсь разработкой расчётных систем в Мир Plat.Form. Два года назад мы перешли от взаимодействия через REST к использованию Kafka между системой Фронт-Офиса, в которой хранятся данные об авторизованных транзакциях, и системой Бэк-Офиса, ответственной за выполнение клиринга. При этом мы реализовали собственный сериализатор для работы с AVRO сообщениями. Более подробно о причинах выбора такого подхода я рассказал в статье.

Тогда нам не требовалась поддержка изменения схемы данных, так как контракт взаимодействия был стабильным на протяжении нескольких лет.
Но в прошлом году перед нами встала задача внедрения новой интеграции, что могло повести за собой частые изменения в модели данных, из-за чего нам пришлось пересмотреть подход к работе с AVRO сообщениями.
В этой статье я расскажу:
1. Что такое эволюция данных и почему она критична для Kafka при работе с AVRO сообщениями.
2. Какие варианты решения мы рассмотрели.
3. Как реализовали поддержку изменений схемы.
4. Подводные камни, с которыми столкнулись, и как их обошли.
Что такое эволюция данных в AVRO сообщениях?
Данные, которые мы передаем в формате AVRO представляют собой набор байт, составленный по схеме. Например:
{
"type": "record",
"name": "Payment",
"fields": [{"name": "amount", "type": "int"}]
}Данная схема будет нам нужна и на консьюмере и на продюсере, так как по ней мы сериализуем и десериализуем наше сообщение. Но в процессе жизни нашего приложения у нас может возникнуть необходимость обновить схему, добавить, удалить поля, сделать их необязательными.
Эволюция схемы — это способность постепенно и безопасно изменять структуру данных. Она позволяет сохранять совместимость со старыми сервисами при появлении новых данных, а также обеспечивает возможность новым сервисам корректно читать старые данные.
Для нашей схемы из примера эволюция данных может выглядеть следующим образом
{
"type": "record",
"name": "Payment",
"fields": [
{"name": "amount", "type": "int"},
{"name": "currency", "type": "string", "default": "RUB"} // Новое поле
]
}Почему это критично для Kafka и AVRO?
1. Сообщения в Kafka хранятся неделями/месяцами. Консьюмер должен уметь читать все версии данных из топика.
2. Сервисы обновляются асинхронно. Продюсер может отправить данные по новой схеме, пока консьюмер ещё находится на старой. Или, наоборот, консьюмер будет работать на новой версии схемы, в то время пока продюсер еще пишет данные в топик по старой.
Почему эволюция данных стала для нас проблемой?
Так как наш сериализатор был заточен под фиксированную структуру данных, то любое изменение (даже добавление нового поля) требовало синхронного обновления всех сервисов и остановки обработки сообщений на время изменений.
А при отладке работы с новой интеграцией контракт мог меняться достаточно часто.
Изначально мы закладывались на механизм версионирования топиков. Мы планировали для каждой версии схемы создавать отдельный топик (например: payments_v1, payments_v2). Продюсеры бы писали сообщения в новые топики, а консьюмеры подписывались бы на нужные версии этих топиков.

Такой подход был прост в реализации, обеспечивал полную изоляцию версии схем внутри топика и не требовал никаких дополнительных инструментов.
Однако, на практике подобный подход хорошо использовать внутри одной команды.
При межкомандном взаимодействии возникают трудности. Нужно синхронизироваться,
а у каждой из команд свои приоритеты, усложняются маршруты, появляются новый сложности при переключении, на администрирование с обеих сторон требуется больше ресурсов.
Но, важно помнить, что эволюция схемы покрывает далеко не все изменения модели данных (более подробно поговорим об этом в следующем разделе).
Поэтому версионирование топиков имеет место быть, но при глобальных изменениях в схеме, которые происходят очень редко. Для мелких и частых изменений это слишком громоздкий механизм.
Что и привело нас к необходимости поиска нового подхода к сериализации данных.
Механизм эволюции AVRO схемы от Confluent
Для решения проблемы эволюции данных в Kafka Confluent предлагается готовое решение — Schema Registry. Это специализированный сервис, который управляет версиями AVRO-схем и обеспечивает их совместимость при изменениях.
Schema Registry — это сервис, который выступает в роли: Централизованного хранилища схем (их версий и метаданных), а также включает в себя функционал проверки совместимости изменений.
Как это работает:
1. Регистрация схемы. При первом использовании схема отправляется в Schema Registry и получает уникальный ID (например, schema_id=19).
2. Сериализация сообщения. Продюсер: Добавляет schema_id в начало сообщения. Кодирует данные по схеме [magic_byte][schema_id][avro_data_bytes]
- magic_byte — указывает, что используется Schema Registry
- schema_id (4 байта) — уникальный ID схемы в Schema Registry
- avro_data_bytes — само сообщение в формате avro
3. Десериализация. Консьюмер: Извлекает schema_id из сообщения. Запрашивает схему по ID из Registry. Декодирует данные.

Важно не забывать о том, что для улучшения производительности на продюсерах и консьюмерах кэшируются версии схем, поэтому, в случае если вы решите почистить схем реджестри и записать туда новые схемы, то так же будет необходимо и почистить кэш на сервисах.
Когда мы говорим об эволюции данных в AVRO, ключевым моментом является совместимость схем. Schema Registry поддерживает четыре основные стратегии проверки совместимости, которые определяют, как изменения схемы влияют на потребителей и продюсеров.
1. Обратная совместимость (BACKWARD/BACKWARD_TRANSITIVE)
Обратная совместимость означает, что потребители, использующие новую схему, могут читать данные, созданные с предыдущей схемой. Например, если есть три схемы для субъекта, изменяющиеся в порядке X-2, X-1 и X, то обратная совместимость гарантирует, что потребители с новой схемой X смогут обрабатывать данные, записанные производителями с использованием схем X или X-1, но не обязательно X-2.
Если потребителю с новой схемой нужно обрабатывать данные, записанные всеми зарегистрированными схемами (а не только последними двумя), используйте BACKWARD_TRANSITIVE вместо BACKWARD. Например, BACKWARD_TRANSITIVE гарантирует, что потребители с новой схемой X смогут читать данные, созданные с X, X-1 или X-2.
2. Прямая совместимость (FORWARD/FORWARD_TRANSITIVE)
Прямая совместимость означает, что данные, созданные с новой схемой, могут читаться потребителями с предыдущей схемой.
- FORWARD: данные из X читаются потребителями с X или X-1.
- FORWARD_TRANSITIVE: данные из X читаются потребителями с X, X-1 или X-2.
3. Полная совместимость (FULL/FULL_TRANSITIVE): комбинация обеих.
Полная совместимость (FULL) означает, что схема одновременно обратно и прямо совместима.
- FULL: совместимость между X и X-1 в обоих направлениях.
- FULL_TRANSITIVE: совместимость между X, X-1 и X-2.
4. Без проверки совместимости (NONE)
Так же возможно отключить проверку совместимости. Такой вариант необходим при несовместимых изменениях (например, смене типа поля с Number на String). В таком случае потребуется, одновременно обновить всех производителей и потребителей, либо создать новый топик постепенно переносить приложения.
Более подробно про плюсы и минусы каждого режима можно прочитать в статье Типы совместимости в Schema Registry для Apache Kafka.
Confluent Schema Registry — это мощный инструмент для управления эволюцией данных в Kafka. Его использование:
- Снижает риски при изменениях схем
- Автоматизирует проверку совместимости
- Упрощает работу с распределёнными системами
Однако, в момент начала работы мы находились на этапе изменения архитектурного подхода к взаимодействию с Kafka. Теперь каждая команда не имела своей отдельный инстанс Kafka, а создавалась единая для всех команд инфраструктура. И этот процесс был ещё не завершен к моменту начала работы над новой интеграцией.
Учитывая, что старый подход вскоре будет заменён, мы не были готовы делать большие инфраструктурные изменения. Такие изменения привели бы к дополнительной нагрузке как на нашу команду, так и на команду, отвечающую за продюсера. Мы стремились этого избежать. Поэтому выбрали подход с доработкой нашего прошлого решения.
Но для новых проектов настоятельно рекомендуем использовать Schema Registry.
Обновление десериализатора
Как же решить проблему эволюции данных, если мы не используем схем реджестри.
Основная наша задача правильно сопоставить бинарное сообщение и схему по которой это сообщение записано.
Первая возможность — это решение в лоб, попытаться десериализовать сообщение всеми схемами, от более новой версии к более старой. Основной плюс такого реше��ия не потребуется дорабатывать сериализтор, изменения коснуться только этапа дессериализации, поэтому на стороне продюсера ничего не изменится. Минусы касаются не оптимальности работы, с каждым добавлением новой схемы сложность будет расти.
Второй вариант решения, сделать добавление номера схемы в начало бинарного сообщения. Повторить механизм схем реджестри, но не ходить за схемой в сервис, а использовать схему из локального хранилища. Плюсом такого подхода является то, что мы всегда знаем какой вариант схемы у нас находится в сообщение и нам не придется заниматься перебором. Но такое решение имеет серьезный недостаток, подобный подход затронул бы и серилизатор и дессериализатор, что требовало бы ресурсы на доработку продюсера, в добавок, пришлось бы решать проблемы со старыми данными, которые были записаны в старом формате.
И возможен третий вариант, наиболее простой и лаконичный, это использовать хедер сообщения в кафке. При передаче нового сообщения продюсер проставляет в хедер номер версии схемы, а на этапе дессериализации мы используем необходимую версию схемы. Плюсам такого подхода является то, что доработка касается только дессериализатора, мы всегда знаем какая именно версия сообщения записана.
При этом если нам попадется сообщение без хедера, то по умолчанию будем считать, что нам пришло сообщение 1 версии. Таким образом мы поддержим работу со старыми и новыми сообщениями. Единственный минус, что появляется новое в хедере, которое важно не забыть добавить.
{
"headers": [
{
"key": "schema-version",
"value": "2"
}
],
"key": "msg-key",
"value": "...binary AVRO data...",
"timestamp": 1234567890123
}Мы остановились на третьем решением, оно показалось нам наиболее оптимальным.
Но в процессе работы мы столкнулись с несколькими нюансами.
При работе с AVRO в Java у нас есть возможность работать с одним из двух подходов.
Первый подход, по авросхеме сгенерировать Java-класс используя плагин и дальше работать непосредственно с этим классом. В этом случае мы получаем класс наследник SpecificRecordBase.
Второй подход, использовать класс GenericRecord, данный класс содержит мапу, где ключом является имя поля в авросхеме, а значением непосредственно значение этого поля. При создании экземпляра GenericRecord необходимо указать схему. По которой и будет производиться дессериализация.
При подходе, который использует GenericRecord все достаточно просто — так как нет привязки к конкретным java-классам.
Но мы использовали подход с SpecificRecordBase, генерировали плагином java-классы, загружали их в хранилище, и оттуда их использовала команда, которая разрабатывала консьюмер, и команда, которая разрабатывала продюсер. Такой подход позволяет задать более жесткий контракт. Однако, подобный подход усложняет работу при решении проблемы с эволюцией данных.
Так как нам нужно иметь один и тот же класс, но в двух версиях.
Можно использовать собственно написанный загрузчик классов, и после десериализации выполнять приведение типов. Но мы решили переиспользовать готовый механизм от Confluent, который используется в десериализаторе работа��щим с схем реджестри.
Он принимает две схемы и преобразовывает старые данные к новым.
Второй нюанс касается обновления версий сервисов. В подобной реализации, когда мы не используем схем реджестри, мы всегда должны обновлять первым консюмер. Потому что если от продюсера придет сообщение новой версии, то на этапе десериализации у нас попросту не будет схемы по которой мы сможем это сообщение десериализовать. Поэтому, первыми всегда должны обновляться консьюмеры и лишь затем сервисы отвечающие за продюсеров.
Послесловие
Наше кастомное решение с использованием заголовков Kafka для управления версиями схем AVRO успешно зарекомендовало себя в production-среде. Оно позволило нам гибко адаптироваться к изменениям в модели данных без необходимости масштабных инфраструктурных изменений.
Несмотря на успешное внедрение кастомное решение — это временная мера, обусловленная спецификой нашего переходного периода. Для новых проектов мы рекомендуем использовать Confluent Schema Registry, так как это готовое, отлаженное решение, которое автоматизирует проверку совместимости схем, снижает риски ошибок и упрощает поддержку распределённых систем.
В будущем, при завершении перехода на единую инфраструктуру Kafka, мы планируем перейти на это решение, чтобы продолжать обеспечивать стабильность и масштабируемость нашей системы.
Спасибо, что прочитали нашу историю! Надеемся, наш опыт окажется полезным для ваших проектов.
