Как стать автором
Обновить

Комментарии 7

А как конкретно spark разбивает партиции на "подпартиции"? Кафка в рамках консьюмер группы позволяет читать партицию только одному консьюмеру: https://kafka.apache.org/documentation/#design_consumerposition

Мониторите ли вы системные метрики кафки? презентация по метрикам от confulent

Если да, и если не жалко рассказать, то:

  • Какая нагрузка на кафку у вас в bytes/sec

  • Какой при этом LocalTimeMs

Я столкнулся с тем, что не могу найти в интернетах, чтоб кто-то этими метриками делился, поэтому, пока не могу понять, норм ли это, иметь значения >1s.

На каждом триггере получаем range оффсетов для конкретной партиции. Смотрим на параметр minPartition и пытаемся побить этот range на несколько маленьких частей, чтобы каждый воркер получил свою таску.

Условно есть экзекутор получивший 500 оффсетов, но у него есть два воркера (в терминах данной статьи), здесь просто бьем на два range размера 250 и отдаем их каждому воркеру.

https://github.com/apache/spark/blob/b11252806d49ed7915744147216db41a747e7d4d/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala#L58

По поводу второго вопроса: такие метрики я не мониторил. Посмотрю, если на забуду :)

Интересно получается, спасибо) Почитаю, как время освободится, но звучит так, будто это можно средствами кафки решить, увеличив количество партиций и оставив 1 консьюмер на партицию. А так, возможно, какие-то гарантии по доставке/чтению/очередности теряются.

Не смотрели графики еще?)

Отличная статья, спасибо!

про грейсфул шатдаун интересно (особенно в Python), только что за exists(indicator_path)? проверка есть ли "специальный" файл, типа "передачу закончил - можешь выключаться"?

если так, то почему не лучше ли поллить на наличие входных данных черезquery.lastProgress['numInputRows']/query.recentProgress?

Привет. Сори что так долго отвечал :)

Да, это нужно чтобы мы могли выключить стриминг не только из самой джобы, но и извне, так как awaitTermination - это асинк операция.

А вот про query.lastProgress['numInputRows'] не знал. Посмотрю что это.

Сходу вижу проблему -- представь сервис, который продьюсит в топик падает, и мы не видим никаких сообщений в течение 15 минут. Тогда стриминг тоже гарантированно завершается. Следовательно нам нужно держать в голове этот момент. А в случае с индикатором он постоит 15 минут лишних без данных -- тут ничего страшного нет.

А если мы хотим передеплоить его, добавив новую фичу какую-то или поменять ресурсы, то вот прокидывание индикатора тут помогает.

Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.