Pull to refresh

Apache Spark Catalyst: секреты оптимизатора запросов, который должен знать каждый Data Engineer

Level of difficultyHard
Reading time17 min
Views1.7K

Меня зовут Кучеров Андрей и я Lead Data Engineer с более чем 7-летним опытом в области BigData. Я работал над оптимизацией высоконагруженных Spark-приложений в X5 Retail Group, Beeline, RaiffeisenBank, где мы обрабатывали петабайтные объемы данных. Регулярно сталкиваясь с производительностью запросов, я убедился, что понимание работы Catalyst — необходимый навык для каждого Data Engineer, работающего со Spark.

Введение: почему Catalyst так важен?

Представьте, что вы написали сложный SQL-запрос с пятью JOIN-операциями и десятками фильтров. Как Apache Spark решает, в каком порядке выполнять эти операции? Какие таблицы первыми соединять? Какие фильтры применять раньше? Всеми этими решениями управляет Catalyst — мозговой центр Apache Spark.

Для начинающего Data Engineer понимание Catalyst критически важно по трем причинам:

  1. Отладка запросов: Когда ваш запрос выполняется несколько часов вместо минут, понимание логики Catalyst позволит найти причину.

  2. Оптимизация производительности: Зная, как работает оптимизатор, вы сможете писать более эффективный код.

  3. Понимание ограничений: Spark не всесилен — в некоторых случаях вам придется помогать Catalyst делать правильный выбор.

В этой статье мы разберем внутреннее устройство Catalyst и покажем, как использовать его возможности для эффективной обработки данных.

Что такое Catalyst?

Catalyst — это оптимизатор запросов в Apache Spark, который преобразует высокоуровневые операции (SQL, DataFrame API) в эффективные планы выполнения. Он появился в Spark SQL и стал ключевым компонентом, обеспечивающим производительность всей экосистемы Spark.

В отличие от классических оптимизаторов SQL, Catalyst:

  • Работает с разными источниками данных (Parquet, CSV, JDBC и т.д.)

  • Адаптируется к распределенной среде выполнения

  • Использует языковые особенности Scala для расширяемости

Начинающему разработчику важно понимать: Catalyst автоматизирует оптимизацию, но не может предсказать все особенности ваших данных. Вы должны быть дополнительным слоем интеллекта, который направляет оптимизатор в сложных ситуациях.

Архитектура Catalyst: ключевые компоненты

Catalyst организован как конвейер из четырех последовательных этапов, каждый из которых вносит свой вклад в оптимизацию запросов.

Последовательность операций Catalyst
Последовательность операций Catalyst

1. Анализ и построение логического плана

На этапе анализа Catalyst преобразует SQL-запрос или операции DataFrame в абстрактное синтаксическое дерево (AST), где каждый узел представляет операцию (SELECT, JOIN, WHERE) или выражение.

Например, для запроса:

SELECT name FROM users WHERE age > 30

Сначала создается неразрешенный логический план:

Project ['name]
+- Filter ['age > 30]
   +- UnresolvedRelation [users]

Затем Catalyst применяет разрешение ссылок:

  1. Проверяет существование таблицы users в каталоге

  2. Находит колонки name и age в схеме таблицы

  3. Определяет типы данных для выражений

После этого создается разрешенный логический план:

Project [name#12]
+- Filter (age#13 > 30)
   +- Relation[name#12, age#13, ...] parquet

Для Data Engineer важно понимать: именно на этом этапе возникают ошибки типа AnalysisException, которые указывают на проблемы с существованием таблиц или колонок.

Преобразование запроса в дерево AST
Преобразование запроса в дерево AST

2. Логическая оптимизация: правила (RBO)

На этапе логической оптимизации Catalyst применяет более 50 встроенных правил для переписывания плана запроса. Эти правила основаны на математических свойствах реляционной алгебры.

Основные категории правил:

Упрощение выражений

Например, правило Constant Folding вычисляет выражения с константами:

Filter (age > 25 + 5) → Filter (age > 30)

Перенос фильтров

Правило Predicate Pushdown перемещает фильтры ближе к источникам данных:

// До оптимизации
Project [name]
+- Filter (country = 'US')
   +- Join (users.id = orders.user_id)
      +- Relation [users]
      +- Relation [orders]

// После оптимизации
Project [name]
+- Join (users.id = orders.user_id)
   +- Filter (country = 'US')
      +- Relation [users]
   +- Relation [orders]

Это критически важная оптимизация: фильтры, примененные до соединений, могут радикально сократить объем обрабатываемых данных.

Удаление избыточных операций

Правило ProjectionElimination убирает ненужные проекции:

Project (columns) (Project (same_columns) (child)) → Project (columns) (child)

Для Data Engineer: логическая оптимизация — это преобразование математических выражений без учета особенностей физического хранения данных. Она применяет общие правила, которые всегда приводят к более эффективному плану.

Пример кода Catalyst, реализующего правило:

// Правило для объединения последовательных фильтров
object CombineFilters extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case Filter(cond1, Filter(cond2, child)) =>
      Filter(And(cond1, cond2), child)
  }
}

3. Физическое планирование и стоимостная оптимизация (CBO)

После логической оптимизации Catalyst должен выбрать конкретные алгоритмы выполнения для каждой операции. Например, для JOIN у него есть несколько вариантов:

  • BroadcastHashJoin: копирует маленькую таблицу на все узлы (быстрый, но требует RAM = объем копируемого DataFrame на каждом executor и driver)

  • SortMergeJoin: сортирует обе таблицы и объединяет (работает для больших таблиц)

  • ShuffleHashJoin: создает хеш-таблицы для соединения (промежуточный вариант)

Здесь Catalyst использует Cost-Based Optimizer (CBO) — компонент, который оценивает стоимость различных планов и выбирает оптимальный. CBO учитывает:

  1. Статистику данных (количество строк, размеры таблиц)

  2. Кардинальность колонок (число уникальных значений)

  3. Распределение данных (через гистограммы)

Для эффективной работы CBO требуется собирать статистику:

-- Сбор общей статистики таблицы
ANALYZE TABLE orders COMPUTE STATISTICS;

-- Сбор статистики по колонкам
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS order_id, customer_id;

Для Data Engineer критично понимать: без статистики CBO делает решения на основе приблизительных оценок, что может приводить к неоптимальным планам.

Физический план можно увидеть через метод explain():

df.explain()
// Результат: физический план с выбранными алгоритмами

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[title#18], functions=[sum(cast(label#17 as double))])
   +- Exchange hashpartitioning(title#18, 200), ENSURE_REQUIREMENTS, [plan_id=120]
      +- HashAggregate(keys=[title#18], functions=[partial_sum(cast(label#17 as double))])
         +- Filter (isnotnull(label#17) AND (cast(label#17 as int) = 1))
            +- FileScan csv [label#17,title#18] Batched: false, DataFilters: [isnotnull(label#17), (cast(label#17 as int) = 1)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/seidzi/habr/csv/part-00000-2a2f079d-0edc-43aa-aec2-e3b10e69..., PartitionFilters: [], PushedFilters: [IsNotNull(label)], ReadSchema: struct<label:string,title:string>

4. Генерация кода: движок Tungsten

На последнем этапе Catalyst превращает физический план в исполняемый код с помощью движка Tungsten. Этот движок генерирует оптимизированный байт-код Java для критичных операций, минуя стандартные абстракции Spark.

Например, для фильтра age > 30 Catalyst генерирует примерно такой код:

public boolean filter(InternalRow row) {
  return row.getInt(1) > 30; // Доступ напрямую к колонке age
}

Tungsten обеспечивает:

  1. Векторизованную обработку: обработка данных блоками, а не по одной строке

  2. Управление памятью: эффективное размещение данных с минимизацией сборки мусора

  3. SIMD-оптимизации: использование специальных инструкций процессора

Для Data Engineer важно знать: этот этап в значительной степени автоматизирован, но понимание его работы поможет при отладке производительности.

Факторы, влияющие на эффективность Catalyst

Чтобы эффективно работать с Catalyst, нужно понимать, какие факторы влияют на его решения и оптимизации. Рассмотрим ключевые из них.

1. Тип и формат данных

Колоночные форматы (Parquet, ORC)

Эти форматы позволяют Catalyst применять эффективные оптимизации:

  • Предикатный пушдаун: фильтрация на уровне блоков данных

  • Проекция колонок: чтение только нужных столбцов

  • Векторизация: обработка данных блоками

Пример (ниже csv):

parquet_df.where('label=1').groupby('title').agg({"label": "sum"}).count() # 168190

# CPU times: user 1.67 ms, sys: 7.3 ms, total: 8.97 ms
# Wall time: 1.22 s

Строчные форматы (CSV, JSON)

Для этих форматов оптимизации ограничены:

  • CSV требует чтения и парсинга всего файла

  • JSON поддерживает частичное чтение колонок, но медленнее колоночных форматов

csv_df.where('label=1').groupby('title').agg({"label": "sum"}).count() # 168190

# CPU times: user 4.7 ms, sys: 3.09 ms, total: 7.78 ms
# Wall time: 2.05 s

Для Data Engineer прямая рекомендация: всегда конвертируйте исходные данные в Parquet для аналитических задач. Производительность значительно увеличится.

2. Размер и количество файлов

Catalyst управляет распределением задач на основе файлов и партиций:

Проблема мелких файлов

Если у вас 10,000 файлов по 1 МБ:

  • Каждый файл обрабатывается отдельной задачей

  • Накладные расходы на планирование задач составят до 40% времени

  • Неравномерная загрузка исполнителей

Оптимальный размер 128-256 МБ на файл. Если в качестве системы хранения используется HDFS, то размер файла должен быть приблизительно равен размеру блока.

Для Data Engineer важно помнить: Spark предназначен для больших файлов, а не для большого количества маленьких файлов.

3. Типы данных в JOIN-ключах

Когда вы соединяете таблицы с разными типами ключей, Catalyst вынужден добавлять дополнительные преобразования.

Это приводит к:

  • Неявным операциям CAST в плане

  • Невозможности использования некоторых оптимизаций (например, Broadcast Join)

  • Замедлению до 40% по сравнению с соединением по одинаковым типам

Всегда проверяйте план через explain() на наличие операций cast:

SortMergeJoin [cast(customer_id_int#10 as string)], [customer_id_string#20]

Для Data Engineer: используйте одинаковые типы для JOIN-ключей во всех таблицах. Если нужно преобразование, делайте его явно:

val df1Fixed = df1.withColumn("customer_id_string", $"customer_id_int".cast("string"))

4. Статистика и CBO

CBO в Catalyst работает эффективно только при наличии точной статистики о данных.

Почему статистика критична

Рассмотрим запрос:

SELECT * FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE c.country = 'US'

Без статистики Catalyst не знает:

  • Сколько строк вернет фильтр country = 'US'

  • Стоит ли использовать Broadcast для таблицы customers

  • Какой порядок JOIN будет оптимальным

Как собирать статистику?

Для таблиц в каталоге:

-- Базовая статистика (размер, число строк)
ANALYZE TABLE customers COMPUTE STATISTICS;

-- Детальная статистика по колонкам
ANALYZE TABLE customers COMPUTE STATISTICS FOR COLUMNS id, country;

Для DataFrames (временные таблицы):

// Регистрация временной таблицы со статистикой
df.createOrReplaceTempView("customers_temp")
spark.sql("ANALYZE TABLE customers_temp COMPUTE STATISTICS FOR ALL COLUMNS")

Включение CBO

По умолчанию Catalyst использует только базовые возможности CBO. Для полной оптимизации:

spark.conf.set("spark.sql.cbo.enabled", true)
spark.conf.set("spark.sql.cbo.joinReorder.enabled", true)

Для Data Engineer: отсутствие статистики — одна из главных причин неоптимальных планов. Всегда проверяйте её наличие для критичных таблиц.

5. Распределение данных (Skew)

Перекос данных
Перекос данных

Одним из самых сложных испытаний для Catalyst является перекос данных (skew) — ситуация, когда некоторые значения ключей встречаются гораздо чаще других.

Например, в социальной сети у 10% пользователей может быть 90% всех подписчиков. При выполнении JOIN это приведет к проблемам:

Проблемы skew-данных

  1. Неравномерная загрузка исполнителей: некоторые задачи будут выполняться в 10-100 раз дольше других

  2. Риск OOM: узел может исчерпать память при обработке слишком большой партиции

  3. Общее замедление: время выполнения ограничено самой медленной задачей

Методы борьбы с перекосом

Есть несколько подходов к решению проблемы:

  1. Адаптивное выполнение запросов (AQE) (с Spark 3.0):

// Включение AQE и обработки перекоса
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)

2. Техника "соли" (salting) — добавление случайного префикса к ключам:

// Добавление случайного префикса к ключу с перекосом
val saltedDF = df.withColumn("salt", (rand() * 10).cast("int"))
  .withColumn("salted_key", concat($"skewed_key", lit("_"), $"salt"))
  
// JOIN по "соленому" ключу
saltedDF.join(otherDF, "salted_key")

Важно учитывать что "техника соли" не дает 100% попадание ключей, и необходима дополнительная агрегация.

Для Data Engineer: перекос данных — самая коварная проблема для Spark, так как её часто сложно выявить до момента выполнения. Используйте мониторинг задач в Spark UI для выявления проблемных партиций.

Как заметить перекос?

Достаточно зайти в stage где происходит Join, и посмотреть на метрики по стейджу, в нормальном варианте min, avg и max будут приблизительно равны, однако в случае с перекосом(как на скрине выше) видим что max сильно выше чем остальные персентили.

Практический пример: полный цикл оптимизации

Рассмотрим конкретный пример, как Catalyst преобразует запрос от SQL до исполняемого кода.

Пусть у нас есть запрос:

SELECT o.order_id, c.name, SUM(o.amount) AS total_amount
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE c.country = 'US' AND o.date BETWEEN '2023-01-01' AND '2023-12-31'
GROUP BY o.order_id, c.name;

Шаг 1: Неразрешенный логический план

Catalyst строит первоначальное дерево:

Aggregate ['o.order_id, 'c.name], ['o.order_id, 'c.name, sum('o.amount) AS total_amount]
+- Filter ('c.country = 'US' AND 'o.date BETWEEN '2023-01-01' AND '2023-12-31')
   +- Join Inner, ('o.customer_id = 'c.id)
      :- SubqueryAlias o
      :  +- UnresolvedRelation [orders]
      +- SubqueryAlias c
         +- UnresolvedRelation [customers]

Между 1 и 2 шагом Catalist должен посетить метасторидж, что бы собрать статистику по таблицам(если она есть), проверить существование таблиц, колонок, партиций, типа колонок и т.д.

Шаг 2: Разрешенный логический план

После разрешения ссылок план становится:

Aggregate [order_id#10, name#21], [order_id#10, name#21, sum(amount#12) AS total_amount]
+- Filter ((country#22 = US) AND (date#13 >= 2023-01-01) AND (date#13 <= 2023-12-31))
   +- Join Inner, (customer_id#11 = id#20)
      :- SubqueryAlias o
      :  +- Relation[order_id#10,customer_id#11,amount#12,date#13] parquet
      +- SubqueryAlias c
         +- Relation[id#20,name#21,country#22] parquet

Теперь каждая колонка имеет уникальный идентификатор (#10, #11 и т.д.) и конкретный тип.

Шаг 3: Оптимизированный логический план

Catalyst применяет главные правила оптимизации:

  1. Predicate Pushdown: перенос фильтров к источникам данных

  2. Column Pruning: удаление неиспользуемых колонок

Так же в Rule-Based System существуют более 40 правил, которые так же по возможности будут применяться.

Aggregate [order_id#10, name#21], [order_id#10, name#21, sum(amount#12) AS total_amount]
+- Join Inner, (customer_id#11 = id#20)
   :- Filter (date#13 >= 2023-01-01 AND date#13 <= 2023-12-31)
   :  +- Relation[order_id#10,customer_id#11,amount#12,date#13] parquet
   +- Filter (country#22 = US)
      +- Relation[id#20,name#21,country#22] parquet

Заметьте, что фильтры теперь находятся сразу над источниками данных, что позволяет сократить объем данных до JOIN.

Шаг 4: Физический план

CBO оценивает разные стратегии и выбирает оптимальную:

== Physical Plan ==
HashAggregate(keys=[order_id#10, name#21], functions=[sum(amount#12)])
+- Exchange hashpartitioning(order_id#10, name#21, 200)
   +- HashAggregate(keys=[order_id#10, name#21], functions=[partial_sum(amount#12)])
      +- BroadcastHashJoin [customer_id#11], [id#20], Inner, BuildRight
         :- Filter (date#13 >= 2023-01-01 AND date#13 <= 2023-12-31)
         :  +- FileScan parquet [order_id#10,customer_id#11,amount#12,date#13]
         +- BroadcastExchange HashedRelationBroadcastMode
            +- Filter (country#22 = US)
               +- FileScan parquet [id#20,name#21,country#22]

Ключевые решения в плане:

  1. BroadcastHashJoin: CBO, используя статистику таблицы, решил, что таблица customers после фильтра country='US' будет достаточно маленькой для рассылки на все узлы

  2. HashAggregate: используется для эффективного подсчета суммы

  3. Exchange: обмен данными между узлами для распределенной агрегации

Шаг 5: Генерация кода

Для критических операций Tungsten генерирует JVM-байткод:

// Сгенерированный код для фильтра по дате
public boolean filter(InternalRow row) {
  long date = row.getLong(3);
  return date >= 20230101L && date <= 20231231L;
}

// Сгенерированный код для агрегации
public void update(InternalRow row) {
  double amount = row.getDouble(2);
  sum += amount;
}

Шаг 6: Наблюдение производительности

В Spark UI можно наблюдать:

  1. Время, затраченное на каждый этап

  2. Количество обработанных данных

  3. Наличие перекоса

  4. Эффективность сериализации/десериализации

Инструменты для управления и диагностики Catalyst

Для успешной работы с Catalyst Data Engineer должен владеть следующими инструментами:

1. Анализ планов выполнения

// Базовый план
df.explain()

// Расширенная информация с логическими и физическими планами
df.explain("extended")

// Детальная информация с метриками выполнения
df.explain("formatted")

2. Настройки оптимизатора

Параметр

Описание

Рекомендуемое значение

Сценарий использования

spark.sql.adaptive.enabled

Включает адаптивную оптимизацию запросов (AQE).

true

При работе с большими данными и перекошенными распределениями.

spark.sql.adaptive.coalescePartitions.enabled

Включает объединение партиций в процессе выполнения.

true

При выполнении операций с большими DataFrame.

spark.sql.adaptive.skewJoin.enabled

Включает оптимизацию перекошенных соединений.

true

При наличии значительного перекоса в данных.

spark.sql.execution.arrow.pyspark.enabled

Включает использование Apache Arrow для ускорения передачи данных между JVM и Python.

true

Для повышения производительности при использовании PySpark.

spark.sql.shuffle.partitions

Количество партиций для операций shuffle.

200 (можно настраивать в зависимости от объема данных)

Для оптимизации работы с большими наборами данных.

spark.sql.autoBroadcastJoinThreshold

Максимальный размер таблицы для использования Broadcast Join.

10MB (можно настраивать)

Для ускорения соединений между маленькими и большими таблицами.

spark.sql.parquet.enableVectorizedReader

Включает векторизованное чтение для формата Parquet.

true

Для повышения производительности чтения данных из Parquet.

spark.sql.orc.filterPushdown

Включает фильтрацию на уровне источника данных для формата ORC.

true

Для уменьшения объема обрабатываемых данных при чтении.

3. SQL-хинты для ручной оптимизации

Когда автоматические оптимизации недостаточны, можно использовать хинты для явного указания стратегий:

-- Принудительный broadcast таблицы customers
SELECT /*+ BROADCAST(c) */ *
FROM orders o JOIN customers c ON o.customer_id = c.id

-- Принудительное использование sort-merge join
SELECT /*+ MERGE(o, c) */ *
FROM orders o JOIN customers c ON o.customer_id = c.id

-- Задание порядка соединения таблиц
SELECT /*+ LEADING(o, c, p) */ *
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN products p ON o.product_id = p.id

4. Spark UI для мониторинга

Ключевые разделы Spark UI для анализа работы Catalyst:

  1. Stages: Показывает детали выполнения каждого этапа

  2. Storage: Данные о кэшировании и использовании памяти

  3. Executors: Информация об использовании ресурсов исполнителями

  4. SQL: Отображает выполненные запросы и их планы

Что нового в оптимизации PySpark с выходом Spark 3+

Apache Spark 3.0 и последующие версии принесли революционные улучшения в оптимизации запросов, которые особенно заметны при использовании PySpark. Эти изменения делают Python-интерфейс Spark значительно более производительным.

1. Адаптивное выполнение запросов (AQE)

Adaptive Query Execution (AQE) — это настоящий прорыв в Spark 3.0, позволяющий оптимизировать планы выполнения во время работы, а не только на этапе компиляции.

Ключевые возможности AQE:

  1. Динамическое объединение партиций:

    # До Spark 3.0: много маленьких партиций
    # С размером партиций 10 МБ и 1000 партициями
    # После aggregate многие партиции < 1 МБ
    
    # После Spark 3.0: автоматическое объединение партиций
    spark.conf.set("spark.sql.adaptive.enabled", True)
    spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
    spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64m")
    
  2. Динамическое переключение стратегии соединения:

    • AQE может в процессе выполнения заменить SortMergeJoin на BroadcastHashJoin, если во время выполнения обнаружит, что таблица после фильтрации стала достаточно маленькой.

    # Этот запрос автоматически оптимизируется с AQE
    df1.join(df2, "key").filter(f.col("value") > 1000).show()
    
    # Без AQE: SortMergeJoin (даже если df2 после фильтра < 10 МБ)
    # С AQE: Динамическое переключение на BroadcastHashJoin
    
  3. Оптимизация скошенных соединений:

    • Автоматическое обнаружение и разделение перекошенных партиций

    # Включение обработки перекоса
    spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
    spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 5)
    spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
    

2. Улучшенная производительность Pandas UDF

Pandas UDF (User Defined Functions) получили значительные улучшения производительности и удобства использования в Spark 3.x:

Новые возможности:

  1. Улучшенная сериализация данных:

    • Использование Arrow для эффективной передачи данных между JVM и Python

    • Сокращение накладных расходов на сериализацию на 40-60%

  2. Типы PandasUDF с декораторами:

    # Spark 2.4: использовали константы для определения типа
    @pandas_udf(returnType=DoubleType(), functionType=PandasUDFType.SCALAR)
    
    # Spark 3.0+: чистые и понятные декораторы
    @pandas_udf(DoubleType())  # По умолчанию SCALAR
    def multiply(s: pd.Series) -> pd.Series:
        return s * 2
    
  3. Поддержка итераторов для обработки больших батчей:

    # Обработка больших датафреймов без OOM
    @pandas_udf(DoubleType())
    def process_batch(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
        for batch in batch_iter:
            yield batch * 2
    
  4. Расширенные типы возвращаемых значений:

    • Поддержка структурных и вложенных типов

    • Возможность возвращать и принимать DataFrames целиком

Бенчмарк производительности Pandas UDF:

Скрытый текст
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, pandas_udf, udf
from pyspark.sql.types import StringType
import timeit
import pandas as pd
import uuid
import matplotlib.pyplot as plt

# Конфигурация Spark
def create_spark_session():
    return SparkSession.builder \
        .appName("UUID Benchmark") \
        .master("local[*]") \
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "8g") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .config("spark.sql.shuffle.partitions", "8") \
        .getOrCreate()

spark = create_spark_session()

# Генератор тестовых данных
def generate_data(num_records):
    return spark.range(num_records).cache()

# Функции генерации UUID
def uuid4_str():
    return str(uuid.uuid4())

@udf(StringType())
def python_udf(_):
    return uuid4_str()

@pandas_udf(StringType())
def pandas_vectorized_udf(_: pd.Series) -> pd.Series:
    return pd.Series([uuid4_str() for _ in range(len(_))])

# Тестовые методы
def test_native(df):
    return df.withColumn("uuid", expr("uuid()"))  # Встроенная функция Spark

def test_udf(df, udf_func):
    return df.withColumn("uuid", udf_func(col("id")))

# Параметры тестирования
sizes = [100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, 1_000_000]
trials = 50
results = []

# Прогрев JVM
generate_data(10).transform(test_native).count()

for size in sizes:
    print(f"\n=== Тестирование на {size:,} записях ===")
    df = generate_data(size)
    
    # Нативная функция Spark
    native_time = timeit.timeit(
        lambda: test_native(df).count(),
        number=trials
    ) / trials
    
    # Обычная UDF
    udf_time = timeit.timeit(
        lambda: test_udf(df, python_udf).count(),
        number=trials
    ) / trials
    
    # Pandas UDF
    pandas_udf_time = timeit.timeit(
        lambda: test_udf(df, pandas_vectorized_udf).count(),
        number=trials
    ) / trials
    
    results.append((size, native_time, udf_time, pandas_udf_time))
    df.unpersist()

# Вывод результатов
print("\n=== Результаты генерации UUID ===")
print(f"{'Размер':<12} {'Spark UUID()':<12} {'Python UDF':<12} {'Pandas UDF':<12} {'UDF/Spark':<12} {'Pandas/Spark':<12}")
for size, nt, ut, pt in results:
    print(f"{size:<12,} {nt:.6f}   {ut:.6f}   {pt:.6f}    {ut/nt:>8.1f}x    {pt/nt:>8.1f}x")

# Визуализация
df_result = pd.DataFrame(results, columns=['Size', 'Spark Native', 'Python UDF', 'Pandas UDF'])
df_result.set_index('Size').plot(
    kind='bar',
    figsize=(12, 6),
    logy=True,
    title='Сравнение методов генерации UUID'
)
plt.ylabel('Время (секунды, log scale)')
plt.xlabel('Количество записей')
plt.xticks(rotation=0)
plt.tight_layout()
plt.savefig('uuid_benchmark.png', dpi=300)
plt.show()

spark.stop()

Оптимизация запросов: пошаговое руководство

Шаг 1: Анализ исходного запроса

Начните с анализа плана выполнения:

df.explain(true)  // Показывает все планы: неразрешенный, логический, оптимизированный, физический

Ищите:

  • Неоптимальные JOIN-стратегии (SortMergeJoin для маленьких таблиц)

  • Большое количество Exchange-операций (Shuffle)

  • Отсутствие Predicate Pushdown

  • Операции CAST в плане

Шаг 2: Оптимизация данных

  1. Конвертация в Parquet (если данные в CSV/JSON):

df.write.parquet("optimized_data")
  1. Партицирование по ключевым фильтрам:

df.write.partitionBy("date", "country").parquet("partitioned_data")
  1. Сбор статистики:

ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS customer_id, amount;

Шаг 3: Настройка оптимизатора

// Включение всех оптимизаций Catalyst
spark.conf.set("spark.sql.cbo.enabled", true)
spark.conf.set("spark.sql.cbo.joinReorder.enabled", true)
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)

Шаг 4: Оптимизация JOIN-операций

  1. Выбор правильного порядка JOIN:

    • Маленькие таблицы присоединяйте первыми

    • Используйте JOIN-хинты для проблемных случаев

  2. Broadcast для маленьких таблиц:

// Явный broadcast
import org.apache.spark.sql.functions.broadcast
df1.join(broadcast(df2), "key")
  1. Согласование типов ключей:

// Явное приведение типов
val df2WithIntKey = df2.withColumn("key_int", $"key".cast("int"))
df1.join(df2WithIntKey, df1("key") === df2WithIntKey("key_int"))

Шаг 5: Борьба с перекосом данных

  1. Мониторинг перекоса:

    • Проверьте Spark UI для выявления "задач-долгожителей"

    • Найдите ключи с неравномерным распределением

  2. Применение техники "соли"

Шаг 6: Финальные оптимизации

  1. Кэшируйте промежуточные результаты:

val intermediateDF = df.filter(complexCondition).cache()

2. Настройте количество партиций:

// Для небольших данных уменьшайте количество партиций
spark.conf.set("spark.sql.shuffle.partitions", 50)

// Для больших данных увеличивайте их
spark.conf.set("spark.sql.shuffle.partitions", 2000)

3. Избегайте UDF, где это возможно:

// Заменяйте UDF на встроенные функции
// Вместо: df.filter(myUdf($"col"))
df.filter(expr("array_contains(col, 'value')"))

Шаг 7: Измерение результатов

Сравните план и метрики выполнения до и после оптимизации:

  • Время выполнения

  • Объем Shuffle-данных

  • Число и размер партиций

  • Использование памяти

Заключение

Catalyst — это мощный оптимизатор, который существенно повышает производительность запросов Spark. Однако, как и любой инструмент, он имеет свои ограничения и особенности работы.

Ключевые выводы:

  1. Понимайте, как работает Catalyst:

    • Знайте этапы преобразования запросов

    • Изучите основные правила оптимизации

    • Разберитесь в физических стратегиях выполнения

  2. Помогайте оптимизатору:

    • Предоставляйте актуальную статистику

    • Согласовывайте типы данных

    • Используйте хинты для сложных случаев

  3. Будьте готовы к ручной оптимизации:

    • Борьба с перекосом данных

    • Переписывание сложных запросов

    • Тонкая настройка под конкретные сценарии

  4. Используйте правильные инструменты:

    • Анализируйте планы выполнения

    • Мониторьте метрики в Spark UI

    • Собирайте статистику выполнения

Catalyst не заменяет опытного Data Engineer, но значительно упрощает его работу, автоматизируя многие аспекты оптимизации. По мере роста ваших знаний о Catalyst, растет и ваша способность создавать высокопроизводительные приложения для обработки данных.

Tags:
Hubs:
+5
Comments0

Articles