Как стать автором
Обновить

Комментарии 6

в 2022 Spark streaming научился джойнить стримы как ksqldb ?

В Spark Structured Streaming джойн стримов появился с версии 2.3, сейчас актуальная версия 3.3.0 и stream-stream join продолжает развиваться.
Ссылка на пример кода и небольшое overview: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-stream-joins.

Для джойна стримов необходимо иметь watermark с указанием временного интервала, чтобы можно было "поймать" соотносимые данные в потоке. Если один из стримов всегда будет "бежать" быстрее другого, то нужно определённым образом скорректировать условия watermark. KSQL от Confluent имеет такую же идею, с временными окнами, но под "капотом" работает немного по другому, хотя результат такой же. Ссылка на пример кода и небольшое overview (кому интересно): https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#kstream-kstream-join.

Стоит отметить и джойн стримов у Flink - это тоже хорошее решение. Ссылка на пример кода и небольшое overview: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/joining/.

Что из этого эффективней работает - это интересный вопрос для исследования.

Очень-очень жирно :)
При такой нагрузке ваши серверы с кафкой утилизируют сколько, 15-20% всех ресурсов?

Примерно 40% в среднем по HDD и 25% по CPU и RAM. Кластер для Kafka действительно "жирный" и это осознанный шаг. Исходя из специфики потоков наших данных мы выявили важный нюанс - поток ведёт себя волнообразно и "пик волны" может быть очень большим. Проще говоря, за короткий промежуток времени поступает большой объём данных и кластер Kafka прилично нагружается. К тому же этот кластер используется для хранения промежуточных вычислений и выходных триггеров. Сообщения-триггеры создаются на основе входного сообщения, а значит, что на одно сообщение может быть сгенерировано десятки выходных сообщений триггеров и таким образом при нагрузке в 7 млн. сообщений, мы получим дополнительные 7 млн. сообщений в промежуточных топиках и уже сотни миллионов сообщений в кластере в результирующих топиках. Исходя из этого, сервера у нас с запасом и прицелом на будущее, так как есть идеи попробовать для некоторых задач использовать KSQL и подключать новые источники данных.

Так а вам не кажется, что это путь в бездну? Вы уже потребляете кучу ресурсов просто так и будет только хуже.

У меня просто есть сравнимый кейс, в котором в итоге отказались от кафки и перешли на работу с файлами - и сразу стало значительно легче всем. И спарк стал нормально успевать без создания спец кластера для обработки входящего потока, и аж 5 жирных серверов стало не надо - только 2 "дохлых" файлопомойки.

KSQL - на таких объемах? :) Ладно когда у вас там тысяча летит, а на миллионе уже, кажется что, не до этого.

Не очень понял, почему и что будет хуже? Сейчас потребление ресурсов таким и планировалось. Есть хорошее capacity по ресурсам, которое позволяет нам смело интегрировать новые источники данных и увеличивать плотность событий, которые и так увеличиваются каждые полгода. При этом мы не умираем при больших выбросах данных на "пике волны" и можем не заботиться о покупке новых серверов ближайшие несколько лет.
От Кафки отказаться в пользу файлов в нашем случае будет менее эффективным вариантом, потому что:
1. Kafka - единая интеграционная шина для систем источников данных и систем потребителей. Не все системы источники и системы потребители умеют эффективно работать с файлами, однако практически в любом популярном фреймворке и языке программирования есть интеграция с Kafka, которая обычно очень простая. При этом некоторые системы потребители могут работать только с Kafka и ради нас переходить на интеграцию через файлы не будут.
2. При таком объёме данных и скорости обработки пары серверов-файлообменников нам будет недостаточно, как минимум понадобится 3 реплики с 4 серверами, чтобы мы не теряли данные в случае исключительных ситуаций. Возникают также вопросы какую файловую систему использовать с таким количеством I/O, как организовывать параллельность чтения этих файлов и какого размера должны быть эти файлы, то есть по каким критериям их роллировать? Слишком большие файлы и слишком маленькие файлы будут обрабатываться неэффективно относительно такого потока данных. Сколько inode может поддерживать система? И кто будет следить за удалением неактуальных данных? Много вопросов, которые нужно прорабатывать и которые, как мне кажется, уже решены в Kafka.
3. Spark Streaming из коробки не умеет эффективно вести запись в файлы, как минимум не умеет роллировать файлы по размеру и времени.
4. Системе источнику придётся самостоятельно отслеживать изменения в файлах, если таковые будут, то есть нужно будет прикручивать механизм CDC.
5. Также возникают вопросы безопасности данных и разграничения доступов. В Kafka в большинстве своём всё уже есть, а с файлообменниками нужно будет ещё подумать, как это удобно организовать.

По поводу KSQL, вы правы, на больших объёмах использовать неэффективно, поэтому планировалось использовать KSQL именно на небольших потоках, которые появляются после фильтрации "миллиоников". Это могут быть, например, сегменты абонентов со старыми тарифами, в рамках небольшого региона. Чтобы получить таких абонентов нужно сначала с помощью Spark Streaming отфильтровать жирный поток и сделать небольшой поток из таких абонентов. Затем уже можно дополнительно строить аналитику на лету по такому потоку с помощью KSQL.

Зарегистрируйтесь на Хабре, чтобы оставить комментарий