Comments 4
Спасибо за статью! На мой взгляд, было бы лучше, если бы сразу использовалась автоматическая десериализация из JSON вместо затратного преобразования из строки в каждой функции‑обработчике.
С сериализацией результирующего стрима чуть сложнее — можно использовать sealed class из Java 17. Не знаю точно, как в Java, но в Kotlin у меня это получилось без проблем, потому что это поддерживается kotlinx.serialization
за счёт того, что в итоговый JSON дописывается поле type
. Для этого мне пришлось дописать mapValues
в обоих стримах после разделения, чтобы конвертировать объекты к базовому классу.
Ещё было бы полезно упомянуть про streams.setUncaughtExceptionHandler
, потому что пока я о нём не узнал, моя программа завершалась молча в случаях, когда я указывал не тот класс для (де‑)сериализации или когда я опечатался в теле JSON.
Статья очень интересная.
Однако насколько архитектурно правильно использовать шину для трансформаций данных?
Спасибо за статью. Небольшое замечание по обогащению данных. Исходя из моего опыта, читать данные прямо из стороннего источника чревато в обработчике. Это работает пока getAssignedDoctor(notification.getPatientId())
выполняется стабильно и очень быстро. Но со временем имплементация этого метода может поменяться и тут возникнут проблемы при большой нагрузке. Лучше сразу писать докторов в KTable и использовать join c потоком.
Kafka Streams ч3: Stateless processing