Pull to refresh

Comments 9

которая влечет за собой полную перетасовку наших блоков. Это, например, соединение двух таблиц по ключу (JOIN),

Честно говоря, я вот не уверен, что JOIN всегда приводит к Shuffle. А точнее так — есть же вариант Broadcast JOIN, когда более мелкий датасет просто рассылается всем кому нужно, и в моем понимании это не совсем Shuffle, хотя и похожий процесс.

Вы совершено правы насчет BroadcastHashJoin. Он не приводит к shuffle, а следовательно, не приводит к началу нового этапа и является по сути еще одной задачей внутри этапа. Для цели статьи рассматриваем обычный SortMergeJoin, который как раз будет работать без подсказки оптимизатору о broadcast join и без установки порога размера набора данных, при котором broadcast будет использоваться автоматически.

На самом деле это было мелкое уточнение. Тест все равно хороший, на мой взгляд видно практический опыт применения всего вот этого.

>Некоторые моменты достаточно подробно описаны на Хабр, а некоторые – нет.
Некоторые моменты если и описаны — то переводчиками в корпоративных блогах, где автор перевода далеко не всегда практически пробовал хоть что-то. Я лично с удовольствием почитаю что-то еще на такую же тему.

афигенно. спасибо!!!

а есть какой-нибудь формальный подход для определения оптимального размера партиции (например, чтобы его в алгоритмы мониторинга заложить)?

точнее сначала, как поймать момент, что пора подумать о репартицировании, а потом определить оптимальный размер.

Тут скорее все определяется логикой работы с конкретными данными, и возможностями кластера. По крайней мере какой-то обобщенный рецепт мне не известен.

Причем заметьте — возможности кластера не вообще, а в конкретно данный момент. Потому что кластер вообще может скажем иметь 10000 ядер, но в данный момент вам доступно в десять раз меньше, как потому что размеры вашей очереди Yarn ограничены, так и потому, что большинство ядер сейчас занято другими приложениями. Ну и с памятью все тоже самое, само собой.

Поэому все рецепты похожего рода, которые находятся на просторах интернета (например, выбор размеров памяти для executor и их количества) — они сводятся к анализу статических параметров кластера, т.е. сколько ядер и памяти у узлов сети, и сколько самих узлов, и при этом вообще не учитывают, что кластер может быть чем-то сейчас занят, кроме нашего приложения. И поэтому все такие методы сильно приблизительные (а попросту говоря — врут).

В реальности же кластер как правило всегда занят, иногда сильно. Более того, оптимизировать обычно надо не только одно наше приложение, а весь набор задач на кластере, чтобы они все укладывались в свои TLA, а не только некоторые из них, которым повезет.
  • "...но новый этап не начнется пока этот процесс не пройдут все блоки в конце предыдущего этапа."

  • "Здесь и далее для простоты картины будем рассматривать блоки только одного набора данных. В каждый момент времени исполнители могут обрабатывать несколько несвязанных друг с другом этапов."

Тут есть некое противоречие или я не так понимаю ?

Добрый день.

В первой фразе мы говорим о связанных этапах, во второй о несвязанных.

Пример: мы готовим два датасета не связанные друг с другом а потом делаем с ними join. Подготовка каждого из них может быть разделена на несколько этапов в каждом из них могут быть группировки, свои join и т.д. И т.к. подготовка каждого датасета не связана с другим, то они могут идти параллельно. Но в конце концов они встретятся на join и вот тогда этап join будет ждать и подготовку первого и подготовку второго датасета. На рисунке 4 этап (join) ждет когда завершатся 1+3 (это последовательные этапы подготовки первого датасета) и 2 этап (один этап подготовки второго датасета). Последовательно идут этапы 1 поток 3, а этап 2, как не связанный с ними, идет параллельно. Но на 4 этапе они встречаются.

Да, это понятно. Смутило вот это предложение: "Здесь и далее для простоты картины будем рассматривать блоки только одного набора данных."
Спасибо за разъяснения и за статью!

Sign up to leave a comment.