Pull to refresh

Comments 4

Спасибо за статью! На мой взгляд, было бы лучше, если бы сразу использовалась автоматическая десериализация из JSON вместо затратного преобразования из строки в каждой функции‑обработчике.

С сериализацией результирующего стрима чуть сложнее — можно использовать sealed class из Java 17. Не знаю точно, как в Java, но в Kotlin у меня это получилось без проблем, потому что это поддерживается kotlinx.serialization за счёт того, что в итоговый JSON дописывается поле type. Для этого мне пришлось дописать mapValues в обоих стримах после разделения, чтобы конвертировать объекты к базовому классу.

Ещё было бы полезно упомянуть про streams.setUncaughtExceptionHandler, потому что пока я о нём не узнал, моя программа завершалась молча в случаях, когда я указывал не тот класс для (де‑)сериализации или когда я опечатался в теле JSON.

Статья очень интересная.

Однако насколько архитектурно правильно использовать шину для трансформаций данных?

многое зависит от объемов данных, если позволяют + в эко системе Apache Kafka и команда готова поддерживает то почему бы и нет для Kafka Streams, но думаю при больших объемах возможно лучше использовать другие ETL пайплайны и инструменты и да делать это вне шины

Спасибо за статью. Небольшое замечание по обогащению данных. Исходя из моего опыта, читать данные прямо из стороннего источника чревато в обработчике. Это работает пока getAssignedDoctor(notification.getPatientId()) выполняется стабильно и очень быстро. Но со временем имплементация этого метода может поменяться и тут возникнут проблемы при большой нагрузке. Лучше сразу писать докторов в KTable и использовать join c потоком.

Sign up to leave a comment.

Articles