Опыт разработки и внедрения универсального коллектора для интеграции КХД с Kafka

Привет, Хабр!
В этой статье хочу поделиться нашим опытом интеграции с Kafka.
В Мегафоне несколько десятков сервисов являются потребителями данных, публикуемых в кластерах Kafka. Все они разрабатывались под узкоспециализированные задачи.
В какой-то момент в нашем КХД также появилась необходимость интеграции с Kafka.
При разработке первой интеграции мы пошли традиционным путем и использовали Kafka Connect для Confluent 6.0.1. Сообщения, читаемые коннектором, перекладывались в Hadoop. Далее в PySpark выполнялся парсинг нужных данных, и полученные пачки выгружались в Oracle Exadata.
Но на этапе опытно-промышленной эксплуатации у нас возникли проблемы с производительностью из-за большого объема читаемых данных: ~100-110 млн сообщений в час (поток со звонками абонентов). Также было требование от бизнеса - данные в конечной витрине должны появляться с задержкой не более часа. Оптимизация интеграции затянулась еще на пару месяцев.
В итоге решение, которое мы внедрили в пром, не в полной мере устроило нас. Сложная реализация подразумевала необходимость привлекать на его дальнейшую доработку дефицитных экспертов.
Тем временем, перед нами встала задача разработки еще нескольких интеграций с Kafka.
Было очевидно, что требуется какое-то решение, которое не только ускоряло бы внедрение, исключая рутинную разработку, но и позволяло реализовать стандартную для таких интеграций батчевую выгрузку считанных сообщений в разные БД (Oracle/Hive/ClickHouse и в перспективе в Greenplum). И кроме того, умело выполнять предварительную обработку данных на лету (парсинг и трансформацию значений заданных атрибутов).