Привет, Хабр! Меня зовут Лев Маковеев. Я младший инженер по обработке данных в компании «Криптонит». В этой статье хочу поделиться с вами результатами небольшого исследования, в ходе которого мы протестировали ускоритель запросов Apache DataFusion Comet и пришли к довольно впечатляющим результатам. Забегая вперёд, отмечу, что в отдельных тестах ускорение было более чем десятикратным!
В сфере обработки больших данных фреймворк Spark можно назвать стандартом де-факто. Мы любим его движок, оптимизированный для выполнения задач в кластерах, интеллектуальное кэширование в оперативной памяти, высокоуровневый API на четырёх языках и довольно высокую скорость, но нет предела совершенству!
Сообщество Apache Software Foundation (ASF) продолжает создавать всё новые инструменты для ускорения отдельных этапов работы Spark. Один из них — ускоритель обработки запросов DataFusion Comet, написанный на языке Rust и анонсированный в 2023 году. Его главная фишка в том, что он легко интегрируется с экосистемой Spark без необходимости внесения каких-либо изменений в код. Согласитесь, весьма заманчиво освоить ещё один простой инструмент и добиться многократного выигрыша в скорости без утомительного переписывания кода.
Чтобы разобраться как именно Comet ускоряет работу Spark, вспомним, как вообще работает последний.
После того, как пользователь написал код Spark-приложения (т.е. запросы по чтению-обработке-записи некоторых данных) и запустил его, Spark начинает процесс генерации вычислительного плана и выделения вычислительных ресурсов. Вычислительный план делится на две части: логический план и физический.
Логический план представляет собой краткий перечень всех шагов преобразования, которые необходимо выполнить. Существует три разновидности логического плана в Spark:
Неразрешённый логический план — создаётся непосредственно из запросов пользователя.
Разрешённый логический план — создаётся после анализа неразрешённого плана: проверки корректности операций и проверки существования всех названных в запросе таблиц и столбцов.
Оптимизированный логический план — создаётся из разрешённого плана путём его оптимизации с помощью встроенного в Spark Catalyst Optimizer — оптимизатора запросов.
Физический план представляет собой конкретную реализацию оптимизированного логического плана. Так как отдельная операция логического плана (такая, как join или aggregate) может быть реализована с помощью разных алгоритмов, Spark выбирает наиболее подходящие реализации под конкретный случай, то есть из множества всех возможных физических планов Spark выбирает оптимальный. После выбора наилучшего физического плана создаётся исполняемый код для запроса, который необходимо выполнить распределённым образом в кластере. Весь описанный процесс называется генерация кода (Codegen) и происходит в рамках движка Tungsten внутри Spark.

Как Comet помогает Spark
Если в Spark-приложении инициализирован плагин Comet, после выбора оптимального физического плана Spark'ом Comet начинает процедуру оптимизации (хотя корректнее это назвать подменой операций) выбранного физического плана. Суть «оптимизации» Comet — замена функций Spark'a, работающих на JVM, своими функциями, работающими в рамках движка DataFusion. В обработке больших данных Spark на JVM уступает низкоуровневому Rust'у и движку DataFusion в производительности, поэтому подмена функций положительно сказывается на времени работы.
Оптимизация Comet проводится по двум основным правилам: CometScanRule и CometExecRule.
CometScanRule заменяет операции чтения parquet (а еще, по документации, csv и json) файлов операторами Comet. Comet заменяет ридер Spark на собственный, который форматирует группы строк в Arrow – колоночный формат данных, используемый по дефолту в движке Apache DataFusion. Comet поддерживает только некоторое подмножество типов данных и вернётся к ридеру Spark, если в читаемых данных будут обнаружены неподдерживаемые типы. Список поддерживаемых типов: Boolean, Byte, Short, Integer, Long, Float, Double, String, Binary, Decimal, Date, Timestamp, TimestampNTZ, Null.
CometExecRule проходит снизу вверх по выбранному физическому плану Spark и пытается заменить каждый оператор эквивалентом Comet. Например, ProjectExec будет заменён на CometProjectExec. Далеко не все операции и выражения Spark имеют альтернативы в Comet. Поддерживаемые операции появляются с обновлениями, и их можно увидеть в официальной документации. Comet не поддерживает частичную замену подмножеств плана в рамках одного stage'a, поскольку это потребовало бы добавления переходов для преобразования между строковыми и колоночными данными между операторами Spark и операторами Comet, а накладные расходы на это могут перевесить преимущества. Тем не менее, это не значит, что Comet нельзя использовать с неподдерживаемыми операциями и выражениями. Оптимизатор Comet умеет оценивать, что выгоднее: провести весь stage в Spark или частично провести его в DataFusion.
Установка плагина
Для запуска spark job с включённым плагином Apache Comet необходимо добавить jar-файл Comet в classpath проекта. Получить jar-файл можно тремя путями:
С помощью Maven (amd64 и arm64 Linux).
Скачать напрямую с сайта Comet (amd64 и arm64 Linux).
Скомпилировать локально из исходного кода по инструкции (Linux / macOS).
Затем нужно включить и настроить Comet в конфигурации Spark. Точкой входа в Comet является класс org.apache.spark.CometPlugin, который можно инициализировать в Spark, добавив следующий параметр в конфигурацию Spark: spark.plugins=org.apache.spark.CometPlugin
Обратите внимание: если jar-файл Comet лежит не в classpath, а во внешней директории, то путь нужно прописать в конфигурации:
spark.driver.extraClassPath=$COMET_JAR
spark.executor.extraClassPath=$COMET_JAR
Comet рекомендует использовать память вне JVM (Off-Heap Memory), поэтому необходимо также добавить эти строки в конфигурацию:
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=<some>g
В качестве <some> указывается примерный процент памяти, которую использует JVM. Подробнее об этом читайте здесь.
Comet может использовать собственный механизм shuffle:
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
spark.comet.exec.shuffle.enabled=true
Все конфигурируемые параметры Comet можно найти по этой ссылке.
Тестирование
Чтение из Kafka
Изначально были предположения, что Comet поможет быстрее читать и обрабатывать данные из Kafka, но как позже выяснилось, на сегодняшний день Comet поддерживает только три источника данных — файлы parquet, csv и json. Чтение из Kafka не поддерживается, а, следовательно, весь execution plan переходит в Spark. Об этом нам говорит сам Comet, если обрабатывать и записывать данные, читаемые из Kafka с инициализированным плагином:

Тест #1
Чтение и обработка данных формата parquet считается на данный момент основным способом применения Comet. Для тестирования использовалось 100 ГБ (1,5 млрд строк) полностью уникальных сгенерированных данных. Тестирование проводилось путём измерения времени работы полного цикла чтения-обработки-записи:
Чтение проводилось из s3 в формате parquet.
Обработка представляла собой groupby по трём полям, count и sort.
Запись проводилась в noop или scylla.
Основной рабочий участок кода:
val writer = reader
.groupBy("day", "month", "colour")
.count()
.sort("month", "day")
Для всех тестов была использована конфигурация Spark: driver: 1 core 4g mem, 1 executor: 4 cores 32g mem (Для Comet offHeap.size=8g).
Результаты представлены в таблицах ниже. Время указано в секундах.
Запись из parquet в noop

Среднее время работы SPARK: 986 ± 32 с.
Среднее время работы COMET: 75 ± 3 с.
Запись из parquet в scylla

Среднее время работы SPARK: 1091 ± 25 с.
Среднее время работы COMET: 90 ± 2 с.
Как видите, в обоих тестах COMET ускоряет вычисление более чем в 10 раз!
Тест #2
Из-за провокационных результатов прошлого теста, задачу было решено усложнить. Тестирование проводилось путём измерения времени работы полного цикла чтения-обработки-записи:
Чтение проводилось из s3 в формате parquet или delta.
Обработка представляла собой вычисление хеш-значения по всем полям, groupby + count по полю hash.
Запись проводилась в noop или scylla.
Основной рабочий участок кода:
val writer = reader.
.withColumn(
"hash",
hash(
col("object_id"),
col("guid"),
col("uni_id"),
col("intercept_at"),
col("day"),
col("month"),
col("colour")
) % config.app.hashScale
)
.groupBy("hash")
.count()
.sort(desc("count"))
Запись из parquet в noop

Среднее время работы SPARK: 528 ± 15 с.
Среднее время работы COMET: 244 ± 7 с.
Предполагая нормальную распределённость результатов тестирования, можно вычислить доверительный интервал ускорения (значимость = 0.95): (51,3%, 56,3%).
Запись из parquet в scylla

Среднее время работы SPARK: 553 ± 31 с.
Среднее время работы COMET: 264 ± 12 с.
Доверительный интервал ускорения (значимость = 0.95): (47,54%, 57,04%).
Запись из delta в noop

Среднее время работы SPARK: 342 ± 6 с.
Среднее время работы COMET: 219 ± 3 с.
Доверительный интервал ускорения (значимость = 0.95): (34,48%, 37,63%).
Запись из delta в scylla

Среднее время работы SPARK: 351 ± 3 с.
Среднее время работы COMET: 231 ± 2 с.
Доверительный интервал ускорения (значимость = 0.95): (33,56%, 34,95%).
Заключение
Spark активно развивается вот уже 15 лет. Сегодня он остаётся одним из самых популярных инструментов Big Data благодаря своей универсальности. Его применяют в аналитике, машинном обучении, стриминговых сервисах, научных проектах и других областях, где требуется обработка данных от множества источников.
С появлением Comet и без того шустрый Spark стал и вовсе реактивным. Comet использует SIMD инструкции для параллельного выполнения операций, что ускоряет аналитические запросы. Он эффективно работает с колоночными форматами (Apache Arrow, Parquet) и может задействовать графические процессоры для ускорения отдельных типов вычислений (например, агрегаций и сортировок).
Ещё одна прелесть Comet в том, что он полностью совместим с экосистемой Apache Arrow и обеспечивает «нулевое копирование данных». А ещё, благодаря Rust, Comet обеспечивает не только высокую скорость, но и безопасное использование памяти. Можно было бы считать все эти высокопарные фразы маркетинговым шумом, если бы мы сами не убедились в том, что Comet способен ускорить обработку запросов в Spark буквально в разы и даже на порядок!
Кстати, «Криптонит» ищет специалистов по обработке данных. Переходите по ссылкам, чтобы узнать подробнее о вакансиях и присоединяйтесь к нашей команде!
Senior Data Engineer (разработка аналитических систем и инфраструктура)
Senior Data Engineer (внедрение и эксплуатация продуктов).