Недавно я в очередной раз услышал:
“CSV — это популярный формат хранения данных, имеющий встроенную поддержку в Apache Spark…”
Нууу, на счет “популярный” — согласен, “имеющий встроенную поддержку” — согласен, но на счет “хранения данных” — категорически не согласен. Подобные фразы могут не только сбить с толку окружающих, но и привести к значительным непродуктивным затратам времени (и памяти данных). Давайте разберемся.
В этой статье я расскажу о массовом чтении (то есть методе .load()
Apache Spark) в рамках Structured API и реализации pyspark
(сомневаюсь, что есть отличия в работе в биндингах Scala/Java).
Итак, давайте освежим в памяти несколько общих моментов:
spark использует «ленивые вычисления» (lazy evaluation), означающие, что трансформации (transformations) вычисляются только тогда, когда действие (action) требует, чтобы результат был возвращен управляющей программе.
.load() — это трансформация, поэтому не следует ожидать какой-либо значительной дисковой активности во время выполнения
.load()
(значительность является важной, далее мы разберем это чуть подробнее).
реальная загрузка данных (имеется в виду процесс чтения с диска) происходит, когда мы выполняем
.save()
или любое другое действие.
Пока все звучит достаточно безобидно, не так ли?
“…практика как критерий истины...”
Теперь посмотрим, что происходит на практике (для простоты я вставил фрагменты из Jupyter notebook)
ШАГ 1. Выполните простое трансформацию чтения CSV
ШАГ 2. Распечатайте полученную схему датафрейма с помощью .printSchema()
ШАГ 3. Сохраните датафрейм в файл (сейчас формат не имеет значения - пусть это снова будет CSV)
Пока ничего особенного не произошло. Файл маленький, и задержку трудно заметить на первом шаге (во время выполнения .load()
). Что ж, нам в помощь есть графический интерфейс Spark, и вот что показывает наше маленькое приложение:
Теперь начинается “магия”... У нас есть 3 джоба, что странно: ведь количество джобов обычно соответствует количеству действий, а у нас было одно действие (.save ()
), поэтому мы ожидаем, что у нас будет один джоб. Странно, не правда ли? Чтобы развеять магию, давайте снова пройдем шаг за шагом, и после каждого шага будем наблюдать за Spark GUI.
Выполните первый шаг повторно, и вот как будет выглядеть Spark GUI:
Теперь не может быть никаких сомнений, что появилось два новых джоба (с идентификаторами 3 и 4) без каких-либо действий в нашем коде, следовательно скорее всего это является результатом нашей трансформации .load()
. Давайте копнем глубже и посмотрим метрики дискового ввода-вывода этих джобов:
WEB UI Spark с информацией об этапах выполнения
Интересно — файл считывается уже три раза (размер файла 37KB), какую трансформацию мы осуществили.
Давайте продолжим с нашим пошаговым разбором, повторно выполнив printSchema()
: новых джобов в Spark WEB UI нет (что вполне естественно).
Повторно выполняем .save()
:
Как и предполагалось — еще один новый джоб: было выполнено действие, следовательно мы получили другой джоб. А как насчет метрик дискового ввода-вывода:
Ага, еще одно считывание - мы читали файл уже 4 раза, хорошо, не так ли? Такой простой “конвейер” (загрузка, сохранение) привел к неэффективному вводу-выводу с диска.
Как вам такой формат “хранения данных”? — Мне не очень.
Дело в CSV или, может быть, это у меня какие-то проблемы в конфигурации Spark? Давайте выясним — почему бы нам не повторить те же шаги для другого файлового формата, например возьмем ORC (также имеющий встроенную поддержку Apache Spark).
Я не буду перегружать вас скриншотами и расписывать шаги (шаги не меняются, разница лишь в том, что я загружаю те же данные из файла ORC) - однако результаты будут следующими:
нет джобов после трансформации
.load()
(шаг 1)
получаем схему и никаких джобов после выполнения
.printSchema()
(шаг 2)
всего один джоб и одно чтение файла после выполнения
.save()
(шаг 3)
Итак, мы выяснили, что предыдущее странное поведение связанно с CSV, и что формат ORC ведет себя так, как ожидалось.
И вот мы подошли к сути, которую я хотел выразить этой статьей:
Никогда не храните данные, которые вы планируете обрабатывать с помощью Apache Spark, в формате CSV. Можно использовать CSV в качестве транзитного формата (например, во время операций экспорта или импорта, где это может быть необходимо), для хранения используйте форматы файлов ORC или Parquet.
Я напишу еще одну статью о Parquet и ORC, а пока по своему опыту могу сказать, что они приблизительно на одном уровне (с точки зрения производительности и хранения).
Небольшой дисклеймер и пояснения
Опытные разработчики Spark уже увидели некоторые “несоответствия” в том, что я сказал выше. Позвольте мне немного объяснить, где я “скосил углы”, чтобы лучше выразить суть. Приведенные ниже пояснения не меняют сути дела, но они необходимы, чтобы укрепить “доверие” к сказанному выше.
Вывод схемы
Вывод схемы в Spark платный:
Ожидается, что при использовании вывода схемы (.option
(“inferSchema”, “true”)) данные файла будут пройдены единожды. Но почему чтение данных файла произошло дважды (см. скриншоты выше)?
И как Spark удается вывести схему из ORC файла без каких-либо джобов? Это пример “белой магии” в Spark.
Играет ли здесь какую-то роль вывод схемы? — Да, вы можете быть уверены, что вы, например, можете выполнить .sum()
на значениях столбцов в случае вывода схемы и CSV, и если данные беспорядочны, Spark будет по умолчанию использовать StringType
и не позволит вам суммировать их. Это важно, но очень дорого.
Я потратил дополнительное время и повторил шаги 1–3 для файла CSV без вывода схемы — файл читался всего дважды. Почему? — см. ниже.
Влияние размера файла
Мой файл был относительно небольшим (37KB), что объясняет, почему на шаге 1 (.load()
) он был прочитан один раз — здесь всего лишь один первый блок. Для больших файлов — и я их тоже протестировал — поведение то же самое: без вывода схемы .load()
создает джоб для чтения только первого блока данных файла.
Я предполагаю, что это необходимо, чтобы узнать количество столбцов в создаваемом .load()
датафрейме.
Конец исследования
Эти последние заметки завершают мое небольшое исследование о неэффективности CSV, следите за обновлениями — темы следующих статей: ввод и вывод — важная часть инфраструктуры Apache Spark и очень энергозатратная рабочая рутина любого датаинженера. Ввод/вывод надо делать правильно!
Материал подготовлен в рамках курса «Spark Developer».
Всех желающих приглашаем на открытый урок «Приземление данных с помощью Apache Flink». На вебинаре рассмотрим проблемы чтения и записи данных из Apache Kafka, познакомимся с Apache Flink и посмотрим на стенде, как можно эти проблемы решить.
>> РЕГИСТРАЦИЯ