Большинство современных приложений содержат в себе набор настроек по умолчанию, позволяющий обеспечить достаточно эффективную работу разворачиваемого приложения что называется «из коробки». Есть конечно критики данного подхода, но в целом он позволяет автоматизировать процесс установки и базовой настройки целевой системы.

Однако, при серьезном использовании любой системы рано или поздно возникает необходимость в гибкой настройке. А необходимость в отладке как правило возникает гораздо раньше. Apache Spark в этом плане не является исключением и в этой статье мы поговорим о механизмах настройки Spark и некоторых параметрах, которые пользователям может понадобиться отрегулировать под свои нужды. Также мы рассмотрим механизмы журналирования.

Класс SparkConf

Начнем рассмотрение вопросов настройки Spark с изменения конфигурации среды выполнения приложения. Основным механизмом настройки в Spark является класс SparkConf. При создании нового объекта SparkContext нам потребуется экземпляр SparkConf.

В примере ниже мы объявляем новый экземпляр SparkConf, c именем My Spark APP.

Использование параметра local[4] означает, что приложение Spark запускается в локальном режиме, а не в кластере, где находятся данные. Далее мы переопределяем порт по умолчанию на 36 000. И в завершении создаем SparkContext.

# Создать объект conf

conf = new SparkConf()
conf.set("spark.app.name", "Му Spark Арр")
conf.set("spark.master", "local[4]")
conf.set("spark.ui.port", "36000") # Переопределить порт по умолчанию

# Создать SparkContext с данной конфигурацией

sc = SparkContext(conf)

В целом, класс SparkConf устроен довольно просто, каждый экземпляр SparkConf состоит из пар ключ/значение, представляющих параметры конфигурации, которые пользователь может переопределить. Каждый параметр в Spark определяется строковым ключом и значением.

Чтобы воспользоваться объектом SparkConf, его сначала нужно создать, для этого с помощью метода set (} мы определяем параметры конфигурации, и затем передать этот объект в вызов конструктора SparkContext.

Стоит отметить, что в этом примере мы устанавливаем значения параметров SparkConf программно, в коде приложения, что может оказаться очень удобным при динамическом изменении конфигурации приложения.

Однако, если следовать лучшим практикам разработки приложений, например методологии 12 факторов, то нельзя жестко указывать конфигурационные параметры в коде самого приложения.

И здесь в Spark имеется инструмент spark‑submit. При запуске приложения с помощью сценария spark‑submit, данный скрипт внедряет в окружение параметры настройки по умолчанию, которые затем определяются и переписываются во вновь созданный объект SparkConf. Благодаря этому функционалу пользовательские приложения могут просто конструировать <<Пустые>> объекты SparkConf и передавать их непосредственно в вызов конструктора SparkContext.

При работе с данным конструктором также важно учитывать, что все решения по значениям параметров конфигурации должны быть приняты до создания объекта SparkContext.

В примере ниже при запуске spark‑submit мы используем универсальный флаг ‑conf, который может принимать любые конфигурационные значения Spark.

Также с помощью spark‑submit можно загружать конфигурационные параметры из файла. Использование файлов с готовыми конфигурациями позволяет упростить развертывание приложения в различных средах.

По умолчанию spark‑submit ищет настройки в conf/spark‑defaults.conf и пытается прочитать пары ключ/значение, разделенные пробельными символами. При необходимости можно указать иное расположение файла с помощью флага ‑properties‑file.

$ bin/spark-submit \

    --class com.example.MyApp \

    --properties-file my-conf.conf \

Предлагаемый файл с параметрами конфигурации будет иметь следующий вид:

spark.master local[4]
spark.app.name "Му Spark Арр"
spark.ui.port 36000

Веб интерфейс Spark

Удобным инструментом для выполнения отладки и исследования производительности приложения является веб интерфейс. По умолчанию он доступен на порту 4040 машины, на которой запущен драйвер Spark. Однако в кластерных средах этот порт может отличаться.

В зависимости от используемой версии Spark содержимое веб интерфейса может несколько отличаться, однако обычно на веб странице можно увидеть по нодам, запущенным и завершенным приложениям, потребляемых ресурсах и многое другое.

С помощью веб-интерфейса можно не только выявлять задачи, выполняющиеся медленно, но также и смотреть, сколько времени тратят задачи на каждом этапе своего жизненного цикла: чтение, вычисление и запись. Так, к примеру, если задача тратит мало времени на чтение/запись данных, но выполняется слишком долго, это может быть обусловлено неоптимальной работой программного кода. Хотя иногда ��ывают обратные ситуации, когда некоторые задачи тратят много времени на чтение данных из внешних источников, и в этом случае оптимизация кода не даст ощутимых результатов, потому что узким местом являются операции сохранения данных.

Логирование в Spark

Помимо информации, которую можно получить с помощью веб интерфейса, для решения проблем можно также воспользоваться журналами событий, которые заполняют непосредственно драйверы и исполнители. Журналы событий в силу в силу своей структуры содержат более полные следы аномальных событий, такие как внутренние предупреждения или сведения об исключениях в пользовательском коде. Эти данные могут помочь при решении проблем или устранении неожиданных аномалий в поведении.

В Spark журналы событий распологаются в различных местах в зависимости от режима развертывания. Так в режиме Standalone логи отображаются непосредственно в веб‑интерфейсе ведущего узла. Также по умолчанию события сохраняются в подкаталоге ~/work/ рабочего узла Spark. При использовании режима YARN проще всего получить доступ к журналам с помощью инструмента выборки информации из журналов с помощью:

yarn logs -applicationid <арр ID>

Данная команда возвращает отчет с журналами для указанного приложения.

Также журналы событий можно посмотреть с помощью веб интерфейса:

Что влияет на производительность?

Далее мы поговорим об общих проблемах производительности, с которыми можно столкнуться в приложениях, а также посмотрим, как можно настроить приложение с целью повышения его производительности.

Напомним, что RDD (Resilient Distributed Dataset) в Spark — это неизменяемая распределённая совокупность элементов данных. На логическом уровне набор RDD является единой коллекцией объектов и в процессе выполнения RDD делится на множество разделов, каждый из которых содержит подмножество всех данных.

Когда Spark планирует и выполняет задачи, для каждого раздела создается по одной задаче, и каждая задача будет выполняться по умолчанию на одном ядре. В большинстве случаев такой степени параллелизма вполне достаточно для быстрой обработки наборов RDD. Кроме того, параллелизм для исходных RDD обычно зависит от используемой системы хранения. Например, в HDFS исходные наборы RDD делятся на разделы по блокам файла HDFS. Для наборов, полученных в результате обработки других наборов, степень параллелизма определяется размерами родительских наборов RDD.

В зависимости от степени параллелизма мы можем по разному влиять на производительность. Так, если у нас недостаточно высокая степень параллелизма, некоторые ресурсы Spark могут простаивать в ожидании своей задачи. Например, если в распоряжение приложения передана 1000 ядер, а оно выполняет стадию, состоящую всего из 30 задач, можно было бы увеличить степень параллелизма и задействовать большее число ядер.

И, напротив, если степень параллелизма слишком высока, небольшие накладные расходы, связанные с выполнением каждой задачи, в сумме могут оказаться существенными. Признаком такой ситуации может служить почти мгновенное — в течение нескольких миллисекунд — выполнение задач или когда задачи не читают и не записывают данных.

Посмотрим способы настройки параллелизма. Во‑первых, мы можем передать степень параллелизма в виде параметра в операции, которые производят новые наборы RDD. Во‑вторых — любой имеющийся набор можно перераспределить между большим или меньшим числом разделов.

Мы можем перераспределить RDD случайным образом с помощью операторов repartition() или coalesce (). Если у вас сложилось мнение, что степень параллелизма слишком высокая или слишком низкая, попробуйте перераспределить свои данные с помощью этих операторов.

В качестве примера можно рассмотреть ситуацию, когда приложение читает большой объем данных из облака и сразу после этого выполняет операцию filter (), которая скорее всего исключит какую‑то часть из набора данных. По умолчанию набор RDD, возвращаемый функцией filter(), получит тот же размер, что и родительский, и может включать множество пустых или маленьких разделов. И здесь мы можем увеличить производительность приложения путем объединения маленьких разделов RDD.

Рассмотрим пример кода на Python, который выполняет необходимые действия.

>>> input = sc.textFile("\cloud_store\*.log")
>>> input.getNumPartitions()
35154
# Фильтр, исключающий почти все данные
>>> lines = input.filter(lamЬda line: line.startswith("2024-10-14"))
>>> lines.getNumPartitions()
35154
>>> lines = lines.coalesce(S) .cache()
>>> lines.getNumPartitions()
4
>>> lines.count()

В первых двух строках мы загружаем данные, далее осуществляем фильтрацию. Затем производим объединение строк в RDD перед кэшированием. Все последующие операции будут выполняться с объединенным набором RDD.

Заключение

В этой статье мы поговорили о том, как можно работать с настройками и логами в Apache Spark, а также о некоторых аспектах оптимизации работы с данными.

Кстати, Spark предлагает несколько API. Приглашаю вас на бесплатный вебинар, где рассмотрим чем они отличаются и когда какой стоит использовать. Зарегистрироваться.

Также на странице курса вы можете посмотреть записи прошедших вебинаров и зарегистрироваться на курс.