
Меня зовут Кучеров Андрей и я Lead Data Engineer с более чем 7-летним опытом в области BigData. Я работал над оптимизацией высоконагруженных Spark-приложений в X5 Retail Group, Beeline, RaiffeisenBank, где мы обрабатывали петабайтные объемы данных. Регулярно сталкиваясь с производительностью запросов, я убедился, что понимание работы Catalyst — необходимый навык для каждого Data Engineer, работающего со Spark.
Введение: почему Catalyst так важен?
Представьте, что вы написали сложный SQL-запрос с пятью JOIN-операциями и десятками фильтров. Как Apache Spark решает, в каком порядке выполнять эти операции? Какие таблицы первыми соединять? Какие фильтры применять раньше? Всеми этими решениями управляет Catalyst — мозговой центр Apache Spark.
Для начинающего Data Engineer понимание Catalyst критически важно по трем причинам:
Отладка запросов: Когда ваш запрос выполняется несколько часов вместо минут, понимание логики Catalyst позволит найти причину.
Оптимизация производительности: Зная, как работает оптимизатор, вы сможете писать более эффективный код.
Понимание ограничений: Spark не всесилен — в некоторых случаях вам придется помогать Catalyst делать правильный выбор.
В этой статье мы разберем внутреннее устройство Catalyst и покажем, как использовать его возможности для эффективной обработки данных.
Что такое Catalyst?
Catalyst — это оптимизатор запросов в Apache Spark, который преобразует высокоуровневые операции (SQL, DataFrame API) в эффективные планы выполнения. Он появился в Spark SQL и стал ключевым компонентом, обеспечивающим производительность всей экосистемы Spark.

В отличие от классических оптимизаторов SQL, Catalyst:
Работает с разными источниками данных (Parquet, CSV, JDBC и т.д.)
Адаптируется к распределенной среде выполнения
Использует языковые особенности Scala для расширяемости
Начинающему разработчику важно понимать: Catalyst автоматизирует оптимизацию, но не может предсказать все особенности ваших данных. Вы должны быть дополнительным слоем интеллекта, который направляет оптимизатор в сложных ситуациях.
Архитектура Catalyst: ключевые компоненты
Catalyst организован как конвейер из четырех последовательных этапов, каждый из которых вносит свой вклад в оптимизацию запросов.

1. Анализ и построение логического плана
На этапе анализа Catalyst преобразует SQL-запрос или операции DataFrame в абстрактное синтаксическое дерево (AST), где каждый узел представляет операцию (SELECT, JOIN, WHERE) или выражение.
Например, для запроса:
SELECT name FROM users WHERE age > 30
Сначала создается неразрешенный логический план:
Project ['name]
+- Filter ['age > 30]
+- UnresolvedRelation [users]
Затем Catalyst применяет разрешение ссылок:
Проверяет существование таблицы
users
в каталогеНаходит колонки
name
иage
в схеме таблицыОпределяет типы данных для выражений
После этого создается разрешенный логический план:
Project [name#12]
+- Filter (age#13 > 30)
+- Relation[name#12, age#13, ...] parquet
Для Data Engineer важно понимать: именно на этом этапе возникают ошибки типа AnalysisException
, которые указывают на проблемы с существованием таблиц или колонок.

2. Логическая оптимизация: правила (RBO)
На этапе логической оптимизации Catalyst применяет более 50 встроенных правил для переписывания плана запроса. Эти правила основаны на математических свойствах реляционной алгебры.
Основные категории правил:
Упрощение выражений
Например, правило Constant Folding вычисляет выражения с константами:
Filter (age > 25 + 5) → Filter (age > 30)
Перенос фильтров
Правило Predicate Pushdown перемещает фильтры ближе к источникам данных:
// До оптимизации
Project [name]
+- Filter (country = 'US')
+- Join (users.id = orders.user_id)
+- Relation [users]
+- Relation [orders]
// После оптимизации
Project [name]
+- Join (users.id = orders.user_id)
+- Filter (country = 'US')
+- Relation [users]
+- Relation [orders]
Это критически важная оптимизация: фильтры, примененные до соединений, могут радикально сократить объем обрабатываемых данных.
Удаление избыточных операций
Правило ProjectionElimination убирает ненужные проекции:
Project (columns) (Project (same_columns) (child)) → Project (columns) (child)
Для Data Engineer: логическая оптимизация — это преобразование математических выражений без учета особенностей физического хранения данных. Она применяет общие правила, которые всегда приводят к более эффективному плану.
Пример кода Catalyst, реализующего правило:
// Правило для объединения последовательных фильтров
object CombineFilters extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Filter(cond1, Filter(cond2, child)) =>
Filter(And(cond1, cond2), child)
}
}
3. Физическое планирование и стоимостная оптимизация (CBO)
После логической оптимизации Catalyst должен выбрать конкретные алгоритмы выполнения для каждой операции. Например, для JOIN у него есть несколько вариантов:
BroadcastHashJoin: копирует маленькую таблицу на все узлы (быстрый, но требует RAM = объем копируемого DataFrame на каждом executor и driver)
SortMergeJoin: сортирует обе таблицы и объединяет (работает для больших таблиц)
ShuffleHashJoin: создает хеш-таблицы для соединения (промежуточный вариант)

Здесь Catalyst использует Cost-Based Optimizer (CBO) — компонент, который оценивает стоимость различных планов и выбирает оптимальный. CBO учитывает:
Статистику данных (количество строк, размеры таблиц)
Кардинальность колонок (число уникальных значений)
Распределение данных (через гистограммы)
Для эффективной работы CBO требуется собирать статистику:
-- Сбор общей статистики таблицы
ANALYZE TABLE orders COMPUTE STATISTICS;
-- Сбор статистики по колонкам
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS order_id, customer_id;
Для Data Engineer критично понимать: без статистики CBO делает решения на основе приблизительных оценок, что может приводить к неоптимальным планам.
Физический план можно увидеть через метод explain()
:
df.explain()
// Результат: физический план с выбранными алгоритмами
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[title#18], functions=[sum(cast(label#17 as double))])
+- Exchange hashpartitioning(title#18, 200), ENSURE_REQUIREMENTS, [plan_id=120]
+- HashAggregate(keys=[title#18], functions=[partial_sum(cast(label#17 as double))])
+- Filter (isnotnull(label#17) AND (cast(label#17 as int) = 1))
+- FileScan csv [label#17,title#18] Batched: false, DataFilters: [isnotnull(label#17), (cast(label#17 as int) = 1)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/seidzi/habr/csv/part-00000-2a2f079d-0edc-43aa-aec2-e3b10e69..., PartitionFilters: [], PushedFilters: [IsNotNull(label)], ReadSchema: struct<label:string,title:string>
4. Генерация кода: движок Tungsten
На последнем этапе Catalyst превращает физический план в исполняемый код с помощью движка Tungsten. Этот движок генерирует оптимизированный байт-код Java для критичных операций, минуя стандартные абстракции Spark.
Например, для фильтра age > 30
Catalyst генерирует примерно такой код:
public boolean filter(InternalRow row) {
return row.getInt(1) > 30; // Доступ напрямую к колонке age
}
Tungsten обеспечивает:
Векторизованную обработку: обработка данных блоками, а не по одной строке
Управление памятью: эффективное размещение данных с минимизацией сборки мусора
SIMD-оптимизации: использование специальных инструкций процессора
Для Data Engineer важно знать: этот этап в значительной степени автоматизирован, но понимание его работы поможет при отладке производительности.
Факторы, влияющие на эффективность Catalyst
Чтобы эффективно работать с Catalyst, нужно понимать, какие факторы влияют на его решения и оптимизации. Рассмотрим ключевые из них.
1. Тип и формат данных

Колоночные форматы (Parquet, ORC)
Эти форматы позволяют Catalyst применять эффективные оптимизации:
Предикатный пушдаун: фильтрация на уровне блоков данных
Проекция колонок: чтение только нужных столбцов
Векторизация: обработка данных блоками
Пример (ниже csv):
parquet_df.where('label=1').groupby('title').agg({"label": "sum"}).count() # 168190
# CPU times: user 1.67 ms, sys: 7.3 ms, total: 8.97 ms
# Wall time: 1.22 s
Строчные форматы (CSV, JSON)
Для этих форматов оптимизации ограничены:
CSV требует чтения и парсинга всего файла
JSON поддерживает частичное чтение колонок, но медленнее колоночных форматов
csv_df.where('label=1').groupby('title').agg({"label": "sum"}).count() # 168190
# CPU times: user 4.7 ms, sys: 3.09 ms, total: 7.78 ms
# Wall time: 2.05 s
Для Data Engineer прямая рекомендация: всегда конвертируйте исходные данные в Parquet для аналитических задач. Производительность значительно увеличится.
2. Размер и количество файлов
Catalyst управляет распределением задач на основе файлов и партиций:
Проблема мелких файлов
Если у вас 10,000 файлов по 1 МБ:
Каждый файл обрабатывается отдельной задачей
Накладные расходы на планирование задач составят до 40% времени
Неравномерная загрузка исполнителей
Оптимальный размер 128-256 МБ на файл. Если в качестве системы хранения используется HDFS, то размер файла должен быть приблизительно равен размеру блока.
Для Data Engineer важно помнить: Spark предназначен для больших файлов, а не для большого количества маленьких файлов.
3. Типы данных в JOIN-ключах
Когда вы соединяете таблицы с разными типами ключей, Catalyst вынужден добавлять дополнительные преобразования.
Это приводит к:
Неявным операциям
CAST
в планеНевозможности использования некоторых оптимизаций (например, Broadcast Join)
Замедлению до 40% по сравнению с соединением по одинаковым типам
Всегда проверяйте план через explain()
на наличие операций cast
:
SortMergeJoin [cast(customer_id_int#10 as string)], [customer_id_string#20]
Для Data Engineer: используйте одинаковые типы для JOIN-ключей во всех таблицах. Если нужно преобразование, делайте его явно:
val df1Fixed = df1.withColumn("customer_id_string", $"customer_id_int".cast("string"))
4. Статистика и CBO
CBO в Catalyst работает эффективно только при наличии точной статистики о данных.
Почему статистика критична
Рассмотрим запрос:
SELECT * FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE c.country = 'US'
Без статистики Catalyst не знает:
Сколько строк вернет фильтр
country = 'US'
Стоит ли использовать Broadcast для таблицы
customers
Какой порядок JOIN будет оптимальным
Как собирать статистику?
Для таблиц в каталоге:
-- Базовая статистика (размер, число строк)
ANALYZE TABLE customers COMPUTE STATISTICS;
-- Детальная статистика по колонкам
ANALYZE TABLE customers COMPUTE STATISTICS FOR COLUMNS id, country;
Для DataFrames (временные таблицы):
// Регистрация временной таблицы со статистикой
df.createOrReplaceTempView("customers_temp")
spark.sql("ANALYZE TABLE customers_temp COMPUTE STATISTICS FOR ALL COLUMNS")
Включение CBO
По умолчанию Catalyst использует только базовые возможности CBO. Для полной оптимизации:
spark.conf.set("spark.sql.cbo.enabled", true)
spark.conf.set("spark.sql.cbo.joinReorder.enabled", true)
Для Data Engineer: отсутствие статистики — одна из главных причин неоптимальных планов. Всегда проверяйте её наличие для критичных таблиц.
5. Распределение данных (Skew)

Одним из самых сложных испытаний для Catalyst является перекос данных (skew) — ситуация, когда некоторые значения ключей встречаются гораздо чаще других.
Например, в социальной сети у 10% пользователей может быть 90% всех подписчиков. При выполнении JOIN это приведет к проблемам:
Проблемы skew-данных
Неравномерная загрузка исполнителей: некоторые задачи будут выполняться в 10-100 раз дольше других
Риск OOM: узел может исчерпать память при обработке слишком большой партиции
Общее замедление: время выполнения ограничено самой медленной задачей
Методы борьбы с перекосом
Есть несколько подходов к решению проблемы:
Адаптивное выполнение запросов (AQE) (с Spark 3.0):
// Включение AQE и обработки перекоса
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)

2. Техника "соли" (salting) — добавление случайного префикса к ключам:
// Добавление случайного префикса к ключу с перекосом
val saltedDF = df.withColumn("salt", (rand() * 10).cast("int"))
.withColumn("salted_key", concat($"skewed_key", lit("_"), $"salt"))
// JOIN по "соленому" ключу
saltedDF.join(otherDF, "salted_key")
Важно учитывать что "техника соли" не дает 100% попадание ключей, и необходима дополнительная агрегация.
Для Data Engineer: перекос данных — самая коварная проблема для Spark, так как её часто сложно выявить до момента выполнения. Используйте мониторинг задач в Spark UI для выявления проблемных партиций.
Как заметить перекос?

Достаточно зайти в stage где происходит Join, и посмотреть на метрики по стейджу, в нормальном варианте min, avg и max будут приблизительно равны, однако в случае с перекосом(как на скрине выше) видим что max сильно выше чем остальные персентили.
Практический пример: полный цикл оптимизации
Рассмотрим конкретный пример, как Catalyst преобразует запрос от SQL до исполняемого кода.
Пусть у нас есть запрос:
SELECT o.order_id, c.name, SUM(o.amount) AS total_amount
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE c.country = 'US' AND o.date BETWEEN '2023-01-01' AND '2023-12-31'
GROUP BY o.order_id, c.name;
Шаг 1: Неразрешенный логический план
Catalyst строит первоначальное дерево:
Aggregate ['o.order_id, 'c.name], ['o.order_id, 'c.name, sum('o.amount) AS total_amount]
+- Filter ('c.country = 'US' AND 'o.date BETWEEN '2023-01-01' AND '2023-12-31')
+- Join Inner, ('o.customer_id = 'c.id)
:- SubqueryAlias o
: +- UnresolvedRelation [orders]
+- SubqueryAlias c
+- UnresolvedRelation [customers]
Между 1 и 2 шагом Catalist должен посетить метасторидж, что бы собрать статистику по таблицам(если она есть), проверить существование таблиц, колонок, партиций, типа колонок и т.д.
Шаг 2: Разрешенный логический план
После разрешения ссылок план становится:
Aggregate [order_id#10, name#21], [order_id#10, name#21, sum(amount#12) AS total_amount]
+- Filter ((country#22 = US) AND (date#13 >= 2023-01-01) AND (date#13 <= 2023-12-31))
+- Join Inner, (customer_id#11 = id#20)
:- SubqueryAlias o
: +- Relation[order_id#10,customer_id#11,amount#12,date#13] parquet
+- SubqueryAlias c
+- Relation[id#20,name#21,country#22] parquet
Теперь каждая колонка имеет уникальный идентификатор (#10, #11 и т.д.) и конкретный тип.
Шаг 3: Оптимизированный логический план
Catalyst применяет главные правила оптимизации:
Predicate Pushdown: перенос фильтров к источникам данных
Column Pruning: удаление неиспользуемых колонок
Так же в Rule-Based System существуют более 40 правил, которые так же по возможности будут применяться.
Aggregate [order_id#10, name#21], [order_id#10, name#21, sum(amount#12) AS total_amount]
+- Join Inner, (customer_id#11 = id#20)
:- Filter (date#13 >= 2023-01-01 AND date#13 <= 2023-12-31)
: +- Relation[order_id#10,customer_id#11,amount#12,date#13] parquet
+- Filter (country#22 = US)
+- Relation[id#20,name#21,country#22] parquet
Заметьте, что фильтры теперь находятся сразу над источниками данных, что позволяет сократить объем данных до JOIN.
Шаг 4: Физический план
CBO оценивает разные стратегии и выбирает оптимальную:
== Physical Plan ==
HashAggregate(keys=[order_id#10, name#21], functions=[sum(amount#12)])
+- Exchange hashpartitioning(order_id#10, name#21, 200)
+- HashAggregate(keys=[order_id#10, name#21], functions=[partial_sum(amount#12)])
+- BroadcastHashJoin [customer_id#11], [id#20], Inner, BuildRight
:- Filter (date#13 >= 2023-01-01 AND date#13 <= 2023-12-31)
: +- FileScan parquet [order_id#10,customer_id#11,amount#12,date#13]
+- BroadcastExchange HashedRelationBroadcastMode
+- Filter (country#22 = US)
+- FileScan parquet [id#20,name#21,country#22]
Ключевые решения в плане:
BroadcastHashJoin: CBO, используя статистику таблицы, решил, что таблица
customers
после фильтраcountry='US'
будет достаточно маленькой для рассылки на все узлыHashAggregate: используется для эффективного подсчета суммы
Exchange: обмен данными между узлами для распределенной агрегации
Шаг 5: Генерация кода
Для критических операций Tungsten генерирует JVM-байткод:
// Сгенерированный код для фильтра по дате
public boolean filter(InternalRow row) {
long date = row.getLong(3);
return date >= 20230101L && date <= 20231231L;
}
// Сгенерированный код для агрегации
public void update(InternalRow row) {
double amount = row.getDouble(2);
sum += amount;
}
Шаг 6: Наблюдение производительности

В Spark UI можно наблюдать:
Время, затраченное на каждый этап
Количество обработанных данных
Наличие перекоса
Эффективность сериализации/десериализации
Инструменты для управления и диагностики Catalyst
Для успешной работы с Catalyst Data Engineer должен владеть следующими инструментами:
1. Анализ планов выполнения
// Базовый план
df.explain()
// Расширенная информация с логическими и физическими планами
df.explain("extended")
// Детальная информация с метриками выполнения
df.explain("formatted")
2. Настройки оптимизатора
Параметр | Описание | Рекомендуемое значение | Сценарий использования |
---|---|---|---|
| Включает адаптивную оптимизацию запросов (AQE). |
| При работе с большими данными и перекошенными распределениями. |
| Включает объединение партиций в процессе выполнения. |
| При выполнении операций с большими DataFrame. |
| Включает оптимизацию перекошенных соединений. |
| При наличии значительного перекоса в данных. |
| Включает использование Apache Arrow для ускорения передачи данных между JVM и Python. |
| Для повышения производительности при использовании PySpark. |
| Количество партиций для операций shuffle. | 200 (можно настраивать в зависимости от объема данных) | Для оптимизации работы с большими наборами данных. |
| Максимальный размер таблицы для использования Broadcast Join. | 10MB (можно настраивать) | Для ускорения соединений между маленькими и большими таблицами. |
| Включает векторизованное чтение для формата Parquet. |
| Для повышения производительности чтения данных из Parquet. |
| Включает фильтрацию на уровне источника данных для формата ORC. |
| Для уменьшения объема обрабатываемых данных при чтении. |
3. SQL-хинты для ручной оптимизации
Когда автоматические оптимизации недостаточны, можно использовать хинты для явного указания стратегий:
-- Принудительный broadcast таблицы customers
SELECT /*+ BROADCAST(c) */ *
FROM orders o JOIN customers c ON o.customer_id = c.id
-- Принудительное использование sort-merge join
SELECT /*+ MERGE(o, c) */ *
FROM orders o JOIN customers c ON o.customer_id = c.id
-- Задание порядка соединения таблиц
SELECT /*+ LEADING(o, c, p) */ *
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN products p ON o.product_id = p.id
4. Spark UI для мониторинга
Ключевые разделы Spark UI для анализа работы Catalyst:

Stages: Показывает детали выполнения каждого этапа
Storage: Данные о кэшировании и использовании памяти
Executors: Информация об использовании ресурсов исполнителями
SQL: Отображает выполненные запросы и их планы
Что нового в оптимизации PySpark с выходом Spark 3+
Apache Spark 3.0 и последующие версии принесли революционные улучшения в оптимизации запросов, которые особенно заметны при использовании PySpark. Эти изменения делают Python-интерфейс Spark значительно более производительным.
1. Адаптивное выполнение запросов (AQE)
Adaptive Query Execution (AQE) — это настоящий прорыв в Spark 3.0, позволяющий оптимизировать планы выполнения во время работы, а не только на этапе компиляции.
Ключевые возможности AQE:
Динамическое объединение партиций:
# До Spark 3.0: много маленьких партиций # С размером партиций 10 МБ и 1000 партициями # После aggregate многие партиции < 1 МБ # После Spark 3.0: автоматическое объединение партиций spark.conf.set("spark.sql.adaptive.enabled", True) spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True) spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64m")
Динамическое переключение стратегии соединения:
AQE может в процессе выполнения заменить
SortMergeJoin
наBroadcastHashJoin
, если во время выполнения обнаружит, что таблица после фильтрации стала достаточно маленькой.
# Этот запрос автоматически оптимизируется с AQE df1.join(df2, "key").filter(f.col("value") > 1000).show() # Без AQE: SortMergeJoin (даже если df2 после фильтра < 10 МБ) # С AQE: Динамическое переключение на BroadcastHashJoin
Оптимизация скошенных соединений:
Автоматическое обнаружение и разделение перекошенных партиций
# Включение обработки перекоса spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True) spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 5) spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
2. Улучшенная производительность Pandas UDF
Pandas UDF (User Defined Functions) получили значительные улучшения производительности и удобства использования в Spark 3.x:
Новые возможности:
Улучшенная сериализация данных:
Использование Arrow для эффективной передачи данных между JVM и Python
Сокращение накладных расходов на сериализацию на 40-60%
Типы PandasUDF с декораторами:
# Spark 2.4: использовали константы для определения типа @pandas_udf(returnType=DoubleType(), functionType=PandasUDFType.SCALAR) # Spark 3.0+: чистые и понятные декораторы @pandas_udf(DoubleType()) # По умолчанию SCALAR def multiply(s: pd.Series) -> pd.Series: return s * 2
Поддержка итераторов для обработки больших батчей:
# Обработка больших датафреймов без OOM @pandas_udf(DoubleType()) def process_batch(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]: for batch in batch_iter: yield batch * 2
Расширенные типы возвращаемых значений:
Поддержка структурных и вложенных типов
Возможность возвращать и принимать DataFrames целиком
Бенчмарк производительности Pandas UDF:
Скрытый текст
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, pandas_udf, udf
from pyspark.sql.types import StringType
import timeit
import pandas as pd
import uuid
import matplotlib.pyplot as plt
# Конфигурация Spark
def create_spark_session():
return SparkSession.builder \
.appName("UUID Benchmark") \
.master("local[*]") \
.config("spark.driver.memory", "8g") \
.config("spark.executor.memory", "8g") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.sql.shuffle.partitions", "8") \
.getOrCreate()
spark = create_spark_session()
# Генератор тестовых данных
def generate_data(num_records):
return spark.range(num_records).cache()
# Функции генерации UUID
def uuid4_str():
return str(uuid.uuid4())
@udf(StringType())
def python_udf(_):
return uuid4_str()
@pandas_udf(StringType())
def pandas_vectorized_udf(_: pd.Series) -> pd.Series:
return pd.Series([uuid4_str() for _ in range(len(_))])
# Тестовые методы
def test_native(df):
return df.withColumn("uuid", expr("uuid()")) # Встроенная функция Spark
def test_udf(df, udf_func):
return df.withColumn("uuid", udf_func(col("id")))
# Параметры тестирования
sizes = [100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, 1_000_000]
trials = 50
results = []
# Прогрев JVM
generate_data(10).transform(test_native).count()
for size in sizes:
print(f"\n=== Тестирование на {size:,} записях ===")
df = generate_data(size)
# Нативная функция Spark
native_time = timeit.timeit(
lambda: test_native(df).count(),
number=trials
) / trials
# Обычная UDF
udf_time = timeit.timeit(
lambda: test_udf(df, python_udf).count(),
number=trials
) / trials
# Pandas UDF
pandas_udf_time = timeit.timeit(
lambda: test_udf(df, pandas_vectorized_udf).count(),
number=trials
) / trials
results.append((size, native_time, udf_time, pandas_udf_time))
df.unpersist()
# Вывод результатов
print("\n=== Результаты генерации UUID ===")
print(f"{'Размер':<12} {'Spark UUID()':<12} {'Python UDF':<12} {'Pandas UDF':<12} {'UDF/Spark':<12} {'Pandas/Spark':<12}")
for size, nt, ut, pt in results:
print(f"{size:<12,} {nt:.6f} {ut:.6f} {pt:.6f} {ut/nt:>8.1f}x {pt/nt:>8.1f}x")
# Визуализация
df_result = pd.DataFrame(results, columns=['Size', 'Spark Native', 'Python UDF', 'Pandas UDF'])
df_result.set_index('Size').plot(
kind='bar',
figsize=(12, 6),
logy=True,
title='Сравнение методов генерации UUID'
)
plt.ylabel('Время (секунды, log scale)')
plt.xlabel('Количество записей')
plt.xticks(rotation=0)
plt.tight_layout()
plt.savefig('uuid_benchmark.png', dpi=300)
plt.show()
spark.stop()

Оптимизация запросов: пошаговое руководство
Шаг 1: Анализ исходного запроса
Начните с анализа плана выполнения:
df.explain(true) // Показывает все планы: неразрешенный, логический, оптимизированный, физический
Ищите:
Неоптимальные JOIN-стратегии (SortMergeJoin для маленьких таблиц)
Большое количество Exchange-операций (Shuffle)
Отсутствие Predicate Pushdown
Операции CAST в плане
Шаг 2: Оптимизация данных
Конвертация в Parquet (если данные в CSV/JSON):
df.write.parquet("optimized_data")
Партицирование по ключевым фильтрам:
df.write.partitionBy("date", "country").parquet("partitioned_data")
Сбор статистики:
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS customer_id, amount;
Шаг 3: Настройка оптимизатора
// Включение всех оптимизаций Catalyst
spark.conf.set("spark.sql.cbo.enabled", true)
spark.conf.set("spark.sql.cbo.joinReorder.enabled", true)
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)
Шаг 4: Оптимизация JOIN-операций
Выбор правильного порядка JOIN:
Маленькие таблицы присоединяйте первыми
Используйте JOIN-хинты для проблемных случаев
Broadcast для маленьких таблиц:
// Явный broadcast
import org.apache.spark.sql.functions.broadcast
df1.join(broadcast(df2), "key")
Согласование типов ключей:
// Явное приведение типов
val df2WithIntKey = df2.withColumn("key_int", $"key".cast("int"))
df1.join(df2WithIntKey, df1("key") === df2WithIntKey("key_int"))
Шаг 5: Борьба с перекосом данных
Мониторинг перекоса:
Проверьте Spark UI для выявления "задач-долгожителей"
Найдите ключи с неравномерным распределением
Применение техники "соли"
Шаг 6: Финальные оптимизации
Кэшируйте промежуточные результаты:
val intermediateDF = df.filter(complexCondition).cache()
2. Настройте количество партиций:
// Для небольших данных уменьшайте количество партиций
spark.conf.set("spark.sql.shuffle.partitions", 50)
// Для больших данных увеличивайте их
spark.conf.set("spark.sql.shuffle.partitions", 2000)
3. Избегайте UDF, где это возможно:
// Заменяйте UDF на встроенные функции
// Вместо: df.filter(myUdf($"col"))
df.filter(expr("array_contains(col, 'value')"))
Шаг 7: Измерение результатов
Сравните план и метрики выполнения до и после оптимизации:
Время выполнения
Объем Shuffle-данных
Число и размер партиций
Использование памяти
Заключение

Catalyst — это мощный оптимизатор, который существенно повышает производительность запросов Spark. Однако, как и любой инструмент, он имеет свои ограничения и особенности работы.
Ключевые выводы:
Понимайте, как работает Catalyst:
Знайте этапы преобразования запросов
Изучите основные правила оптимизации
Разберитесь в физических стратегиях выполнения
Помогайте оптимизатору:
Предоставляйте актуальную статистику
Согласовывайте типы данных
Используйте хинты для сложных случаев
Будьте готовы к ручной оптимизации:
Борьба с перекосом данных
Переписывание сложных запросов
Тонкая настройка под конкретные сценарии
Используйте правильные инструменты:
Анализируйте планы выполнения
Мониторьте метрики в Spark UI
Собирайте статистику выполнения
Catalyst не заменяет опытного Data Engineer, но значительно упрощает его работу, автоматизируя многие аспекты оптимизации. По мере роста ваших знаний о Catalyst, растет и ваша способность создавать высокопроизводительные приложения для обработки данных.