Комментарии 9
которая влечет за собой полную перетасовку наших блоков. Это, например, соединение двух таблиц по ключу (JOIN),
Честно говоря, я вот не уверен, что JOIN всегда приводит к Shuffle. А точнее так — есть же вариант Broadcast JOIN, когда более мелкий датасет просто рассылается всем кому нужно, и в моем понимании это не совсем Shuffle, хотя и похожий процесс.
Вы совершено правы насчет BroadcastHashJoin. Он не приводит к shuffle, а следовательно, не приводит к началу нового этапа и является по сути еще одной задачей внутри этапа. Для цели статьи рассматриваем обычный SortMergeJoin, который как раз будет работать без подсказки оптимизатору о broadcast join и без установки порога размера набора данных, при котором broadcast будет использоваться автоматически.
>Некоторые моменты достаточно подробно описаны на Хабр, а некоторые – нет.
Некоторые моменты если и описаны — то переводчиками в корпоративных блогах, где автор перевода далеко не всегда практически пробовал хоть что-то. Я лично с удовольствием почитаю что-то еще на такую же тему.
афигенно. спасибо!!!
а есть какой-нибудь формальный подход для определения оптимального размера партиции (например, чтобы его в алгоритмы мониторинга заложить)?
точнее сначала, как поймать момент, что пора подумать о репартицировании, а потом определить оптимальный размер.
Тут скорее все определяется логикой работы с конкретными данными, и возможностями кластера. По крайней мере какой-то обобщенный рецепт мне не известен.
Поэому все рецепты похожего рода, которые находятся на просторах интернета (например, выбор размеров памяти для executor и их количества) — они сводятся к анализу статических параметров кластера, т.е. сколько ядер и памяти у узлов сети, и сколько самих узлов, и при этом вообще не учитывают, что кластер может быть чем-то сейчас занят, кроме нашего приложения. И поэтому все такие методы сильно приблизительные (а попросту говоря — врут).
В реальности же кластер как правило всегда занят, иногда сильно. Более того, оптимизировать обычно надо не только одно наше приложение, а весь набор задач на кластере, чтобы они все укладывались в свои TLA, а не только некоторые из них, которым повезет.
"...но новый этап не начнется пока этот процесс не пройдут все блоки в конце предыдущего этапа."
"Здесь и далее для простоты картины будем рассматривать блоки только одного набора данных. В каждый момент времени исполнители могут обрабатывать несколько несвязанных друг с другом этапов."
Тут есть некое противоречие или я не так понимаю ?
Добрый день.
В первой фразе мы говорим о связанных этапах, во второй о несвязанных.
Пример: мы готовим два датасета не связанные друг с другом а потом делаем с ними join. Подготовка каждого из них может быть разделена на несколько этапов в каждом из них могут быть группировки, свои join и т.д. И т.к. подготовка каждого датасета не связана с другим, то они могут идти параллельно. Но в конце концов они встретятся на join и вот тогда этап join будет ждать и подготовку первого и подготовку второго датасета. На рисунке 4 этап (join) ждет когда завершатся 1+3 (это последовательные этапы подготовки первого датасета) и 2 этап (один этап подготовки второго датасета). Последовательно идут этапы 1 поток 3, а этап 2, как не связанный с ними, идет параллельно. Но на 4 этапе они встречаются.
Apache Spark: оптимизация производительности на реальных примерах