Привет! Сегодня на связи команда вендора Data Sapience, а именно Spark-разработчик Виталий Мартынов и технические лидеры направления разработки Apache Spark платформы Data Ocean Nova Дмитрий Паршин и Евгений Морозов.
Концепция Lakehouse активно продвигается как «золотая середина» между Data Lake и Data Warehouse: она обещает объединить гибкость хранения данных, расширенную аналитику и соблюдение транзакционности в единой архитектуре с использованием современных табличных открытых форматов, таких как Iceberg (который уже де-факто стал общепринятым стандартом при построении Data Lakehouse). И сегодня мы хотим посвятить статью разбору следующего вопроса: почему без использования Spark нельзя построить полноценный Lakehouse?
Поговорим о том, какую роль Spark играет в Lakehouse-подходе, какие задачи он закрывает успешнее других, о его тесной взаимосвязи с Iceberg и том, почему альтернативы часто не дотягивают до нужного уровня универсальности, масштабируемости и надежности в рамках большой продуктивной среды. Также расскажем, почему мы в Data Ocean Nova используем Spark в качестве движка для обслуживания Iceberg-таблиц и инструмента для миграции данных в Lakehouse.

Почему многие не любят Spark
К сожалению (а может, и к счастью), в мире нет ничего идеального, и Spark не исключение. Какие же его особенности зачастую вызывают неприятие/недовольство/тяжкие вздохи у представителей дата-сообщества? Пройдемся по порядку:
Замудренный способ настройки сессии, в которой будут выполняться запросы/расчеты. В какой-нибудь Impala/Trino/StarRocks зачастую инстанс уже поднят, какие-то конфигурации уже сделаны за пользователя. Все, что ему остается, – написать SQL-запрос и получить данные. Да, пользователь имеет возможность самостоятельно конфигурировать свою сессию, но это совершенно не обязательно.
В случае со Spark все иначе. Каждое приложение (неважно, регламентный это процесс или сессия в Jupyter) представляет собой отдельный поднимаемый инстанс, и пользователю требуется сконфигурировать для него сессию. Возникают вопросы: сколько ядер нужно для моего приложения? а памяти? а сколько указывать spark.dynamicAllocation.maxExecutors? стойте, что такое dynamicAllocation? а плагин Comet включаем для приложения? а сколько тогда указывать памяти Off-heap? Закономерной становится реакция: «Прекратите это издевательство, я аналитик, хочу анализировать данные и приносить бизнес-пользу, а не ломать голову, как эффективно запустить мое приложение»;
Скорость. Не будем отрицать, что Spark не самый быстрый движок для выполнения аналитических запросов, и на это есть множество причин;
Движок слабо применим к быстрым и простым запросам. В аналитике часто бывают ситуации, когда нужно что-то быстро поселектить, посмотреть и сделать выводы. Пока приложение Spark запустится на кластере, на Impala ты уже успеешь проанализировать результат запроса, дойти из дома до офиса и предоставить руководителю устно свои выводы. А дело в тяжелом поднятии драйверов/экзекьюторов. Каждое приложение Spark представляет собой отдельный инстанс, для которого требуется аллокация своих ресурсов на кластере. Пока Cluster Manager где-то там в Kubernetes додумает, на каких нодах и какие поды запускать… В случае с другими движками инстанс приложения общий для всех пользователей и уже поднят – подключайся и сразу пиши запросы к интересующим данным;
Дебаг проблемных приложений. Для этого нам в помощь был придуман Spark History Server. И в нем очень много информации о spark-приложениях. Точнее даже не много, а МНОГО. В первый раз зайдя туда, можно потеряться надолго, а уж истинное понимание, что там, откуда и для чего, требует длительных практических упражнений со Spark.
Почему Spark все же необходим для построения Data Lakehouse
Lakehouse – современная архитектура для управления данными, которая объединяет преимущества Data Lake и Data Warehouse и включает в себя возможности бизнес-анализа и машинного обучения на основе хранимых данных. Также отличительными чертами являются:
Использование открытых табличных форматов данных (Iceberg, Delta, Hudi);
Разделение ресурсов для вычисления и хранения;
Поддержка batch/streaming-нагрузки;
Единый центр управления данными (Data Governance).
Чтобы реализовать эти принципы, нужен надежный и масштабируемый движок. И если посмотреть на существующие решения Data Lakehouse, то можно заметить, что Spark давно де-факто стал одним из стандартных фреймворков для распределенной работы с большими данными. Spark широко используется как при реализации работы с данными в Data Lake, так и во многих Lakehouse-платформах. Databricks, Data Ocean Nova, AWS Glue, Google Dataproc и другие платформы используют Spark в качестве одного из инструментов для работы с данными.
При этом нужно отметить, что среди табличных фо��матов именно Iceberg стал наиболее распространенным на рынке. Некоторые открытые табличные форматы (OTF) очень тесно привязаны к определенному вендору (привет, Delta от Databricks). Некоторые OTF уступают Iceberg в плане производительности и применимости (рекомендую ознакомиться со статьей моего коллеги про сравнение OTF Iceberg и Paimon «Open Table Formats — Iceberg vs Paimon — практика использования»). Кроме того, Iceberg разрабатывается сообществом Spark, благодаря чему этот табличный формат имеет наибольшее количество работающих фичей на этом движке. И раз речь зашла про Iceberg, давайте остановимся на этом пункте подробнее.
Наиболее эффективное обслуживание Iceberg-таблиц
Apache Iceberg – это открытый табличный формат данных, который вместе с записью файлов с данными включает в себя механизм записи и отслеживания метаданных, позволяющий:
выполнять условия ACID-транзакционности;
поддерживать schema evolution и hide partitioning,
ускорять сканирование таблиц (за счет пропуска нерелевантных файлов данных).
Каждое действие с таблицей (ее создание, вставка данных, их удаление и т.д.) создает новый снепшот в метаданных, который описывает, какие файлы представляют собой таблицу в конкретный момент времени. В процессе выполнения действий с таблицей разрастается количество как файлов с данными, так и файлов с метаданными. Для достижения максимальной производительности при работе с таблицей необходимо своевременно и правильно ее обслуживать:
удалять старые ненужные данные и их релевантные метаданные;
мержить (компактить) маленькие файлы с данными в файлы побольше (при этом, возможно, применяя оптимизационные приемы, так какие как z-order);
находить и удалять файлы данных, на которые не ссылается ни один снепшот (remove orphan files) и так далее.
Для выполнения этих важных процедур хотелось бы иметь наибольший контроль над процессом с точечной настройкой. Такой контроль как раз и дает нам Spark.
Spark позволяет представить обслуживание как набор независимых задач по группам файлов (file-groups). Вместо монолитного переписывания всей таблицы мы формируем file groups (например, внутри партиций или по размеру/количеству файлов) и обрабатываем их параллельно (используя настройку max-concurrent-file-group-rewrites). Такой подход дает несколько преимуществ:
можно разбивать нагрузку (обслуживать только часть таблицы за прогон, ограничивать объем переписываемых данных или количество удаляемых объектов);
гибко управлять параллельностью (чтобы не перегружать Data Lakehouse);
повышать отказоустойчивость за счет ретраев Spark на уровне отдельных групп (падение одной таски не приводит к падению всего обслуживания и позволяет безопасно перезапускать процесс), а также за счет использования конфигурации partial-progress.enabled, которая позволяет нам коммитить прогресс перезаписи по частям, а именно по отдельным файловым группам.
Ни один другой движок не дает нам такой точечной настройки обслуживания таблиц.
Обеспечение надежного интеграционного слоя Lakehouse
Интеграция данных из внешних систем (таких как Postgres, Oracle, Greenplum, Kafka, порталы компании и другие корпоративные источники) в Lakehouse является типовой и важной задачей. Хорошее решение должно обеспечивать возможность работать со многими источниками из коробки, добавлять с нуля кастомные источники данных, при необходимости обрабатывать (трансформировать) выгружаемые данные, гарантировать надежность и повторяемость загрузок.
Также отдельно стоит отметить, что должна быть возможность работать с огромными объемами данных, вплоть до ТБ. Среди прочих движков именно Spark является наиболее подходящим решением для этой задачи благодаря своей архитектуре, экосистеме существующих коннекторов и возможностям по добавлению новых. Среди особенностей архитектуры следует отдельно выделить механизм fault-tolerance, который помогает обеспечить надежность выгрузок и расчетов. Более подробно на этом механизме мы остановимся в следующем поинте.
Распределенная выгрузка данных из источников
Для выгрузки данных Spark использует стандартный интерфейс для взаимодействия с базами данных – jdbc (java database connectivity).
Сам процесс выгрузки выглядит следующим образом:
Устанавливается JDBC-соединение с источником;
Запрос к источнику разделяется на несколько параллельных частей (если заданы опции, отвечающие за партиционирование запроса);
Каждый executor Spark-приложения выполняет свой SQL-запрос через JDBC и выгружает результаты.
За счет того, что весь процесс выгрузки разбивается на независимые уникальные запросы, каждый из которых выполняется на отдельном экзекьюторе, достигается параллельность выгрузки данных.
Многообразие коннекторов и возможность добавлять свои собственные
Spark поддерживает огромное количество источников, среди которых:
Реляционные базы данных (Oracle, Postgres);
Облачные хранилища (Minio, AWS);
NoSQL-хранилища (Cassandra, ElasticSearch);
Стриминговые источники (Kafka, Kinesis, socket).
Если у нас есть какой-то особенный источник, который не поддерживается Spark, но выгружать из которого данные хочется, то есть возможность написать собственный коннектор к этому источнику. Это можно сделать, используя Java/Scala, а начиная со Spark 4 версии – с использованием Python.
Обеспечение работы «тяжелых» ETL
Если нам требуется не только выгружать данные, но еще и трансформировать их дальше, то соблюдение принципа fault-tolerance (отказоустойчивости) становится очень важным моментом. Абсолютно в любой организации есть важные критические регламентные расчеты, которые обязательно должны соответствовать определенному SLA.
Представим себе ситуацию, что у нас есть процесс, который собирает данные из нескольких источников на протяжении часов и формирует из них витрину для критической отчетности (например, ПОД/ФТ). Если происходит падение на какой-то таске за полчаса до окончания работы приложения, то очень бы хотелось, чтобы приложение не упало и перезапуск расчета произошел не с начала логики, а именно с того момента, где произошло падение (ну или за пару шагов до этого).
Собственно, Spark и обеспечивает нам эту возможность fault-tolerance. Да, здесь мы имеем не самый быстрый инструмент для расчетов, но зато однозначно самый надежный и к тому же прозрачный (ни один движок не производит столько разнообразных логов в процессе работы).
Соблюдение принципа fault-tolerance достигается благодаря нескольким фичам:
Построение data lineage в процессе расчетов
Каждый датафрейм является уникальным объектом, который получился в результате применения каких-то трансформаций к другим датафреймам (или же получился из чтения данных из какого-либо источника, например, файла);Приземление промежуточных shuffle-файлов
Если на каком-то этапе (stage) произошла проблема и попытка выполнения расчета провалилась, то Spark совершенно не обязательно перезапускать всю работу с самого начала. Достаточно взять shuffle-файлы с предыдущего stage и перезапустить последний упавший stage. Это особенно полезно для долгоработающих батчевых загрузок (или других пользовательских расчетов);Spill данных на жесткий диск
Если Spark не хватает оперативной памяти, данные начинают «разливаться» на диск. Несмотря на то, что сам по себе процесс приземления данных на жесткий диск не является хорошей практикой, все же это – один из инструментов обеспечения надежности работы выгрузок. В том же Trino любой spill по умолчанию выключен (использование spill данных в Trino считается плохой практикой, так как Trino, в первую очередь, предназначен для выполнения быстрых интерактивных запросов), и абсолютно все данные хранятся и обрабатываются в оперативной памяти. Если памяти не хватает – запрос с выгрузкой данных падает.
Кроме того, благодаря архитектуре Spark масштабирует свои приложения более предсказуемо и безопасно, нежели другие движки. Каждое приложение Spark разворачивается как отдельная независимая рабочая единица в кластере. Наше приложение упало по какой-то причине? Что ж, таков путь. Но упало именно наше приложение, а не весь сервис. Здесь нет ситуации, когда плохо написанный SQL-запрос в Spark может привести к падению всех бегущих Spark-приложений на кластере. В отличие от Spark, Trino и Impala функционируют как постоянные daemon, с одновременной работой множества пользователей. И здесь всегда есть вероятность, что мы своим неловким запросом можем вывести из строя весь сервис. Каждый запрос, поступающий от любого пользователя, выполняется в рамках общего пула потоков и памяти (либо его части – ресурсной группы), который принадлежит этим постоянным процессам. И если один запрос потребляет слишком много памяти, то в лучшем случае он просто съест все ресурсы на кластере, а в худшем – приведет в OutOfMemory или зависанию всей ноды.
Другими словами, Spark предоставляет большую степень изолированности отдельных приложений/запросов в сравнении с другими движками. И именно такая изолированность вместе с механизмом fault-tolerance позволяет нам делать действительно надежные пайплайны по расчету критически важных и тяжелых ETL-процессов.
Дополнительные преимущества Spark
Spark включает в себя не только batch-обработку данных, но и ряд других полезных компонентов, которых нет в других движках. К примеру, на Spark мы можем решать задачи из ML-области.
Spark MLLib был изначально создан для распределенной обработки данных (собственно, как и другие модули Spark). Использование встроенного модуля для работы c ML-задачами дает следующие преимущества:
Есть возможность обучать модели на датасетах огромного объема (на сотнях гигабайтов данных);
Данные не нужно перегружать в отдельную среду – они хранятся в storage Lakehouse, и сам spark MLlib работает в этом же Lakehouse;
Само по себе обучение моделей реализовано в виде распределенных вычислений, что позволяет нативно достигать параллелизации.
Функций Spark ML используются для различных преобразований в стандартных ETL-процессах (standart scaler, minmax scaler для нормализации признаков). Либо когда есть реальная необходимость обучить модель на огромном датасете данных (например, обучение градиентного бустинга для прогнозирования всех продаж всех артикулов во всех магазинах в торговой сети).
Несмотря на то, что Spark ML имеет куда меньший функционал, чем питоновские ML-библиотеки, и обладает далеко не самой исчерпывающей документацией, сама возможность обучать модели на огромных датасетах является отличительным преимуществом Spark перед другими движками обработки данных и может действительно пригодиться в повседневной работе.
Обработка данных в реальном времени сегодня является распространенной задачей. Поэтому в Data Lakehouse важно иметь возможность эту задачу решать. Spark позволяет обрабатывать данные в около real-time, в микробатчевом характере, используя модуль streaming. С помощью streaming мы можем загружать данные из потоковых источников, таких как Kafka или сокеты. Сам модуль streaming интегрирован с остальными компонентами и использует единый вычислительный движок. Это означает, что потоковые и пакетные данные можно обрабатывать с помощью одних и тех же API, что снижает сложность разработки и сопровождения приложений.
Например, в нашей платформе Data Ocean Nova потребовалось реализовать аудит выполнения Spark-приложений на основе логов Spark History Server, сохраняемых в S3. Ключевыми особенностями задачи стали динамическая структура логов и наличие нестандартных событий, структура которых заранее неизвестна и может меняться со временем. Использование Spark, уже встроенного в платформу, позволило быстро получить готовое решение без внедрения дополнительных компонентов и усложнения архитектуры.
Недостатки Spark
В разделе недостатков мы выделили долгое поднятие драйверов/экзекьюторов. Но не так давно, начиная со Spark 3.4, появился функционал Spark Connect – современная клиент-серверная архитектура. В стандартной реализации клиент (например, через Jupiter-ноутбук) подключается к Spark напрямую в пределах одного контура. При этом сам клиент потребляет ресурсы локально, и клиент/кластер должны иметь идентичные версии библиотеки. В случае со Spark Connect создается тонкий клиент, с помощью которого пользователь может работать с удаленным кластером Spark. На основе кода строится логический план выполнения запроса, после чего он в двоичном виде отправляется в Spark Connect Server. После этого Connect Server интерпретирует план и выполняет его. Результаты возвращаются нам в виде pyarrow-пакетов. Данный протокол открывает большие возможности для расширения и распространения использования Spark. Кроме очевидных вариантов его использования удаленно в различных аналитических целях в интерактивном режиме (например, для выполнения задач аналитиками или data-scientists), можно также привести в пример легковесную интеграцию Spark в различные микросервисы.
У spark большое сообщество, которое активно его развивает и добавляет все новые и новые фичи. Например, Spark 4.0 сделал большой скачок в сторону дальнейшего развития функциональности Spark SQL и других компонентов. Новые фичи значительно расширяют пользовательский опыт и позволяют делать новые вещи, которые недоступны в других движках. Вот только парочка из возможностей новой версии Spark:
Использование PIPE-синтаксиса при записи запросов к данным. Сама по себе возможность писать такие запросы выглядит не очень привычно для пользователя, который умеет работать с SQL-запросами, но фича может быть полезна для людей, которые SQL не знают вовсе;
Создание комплексных SQL-скриптов на Spark, используя циклы и условное выполнение. Внедрение этой возможности приближает Spark к традиционным (и привычным) реляционным базам данных в плане пользовательского опыта. Теперь разработчик может реализовать сложную логику на SQL, как в каком-нибудь Postgres.
На самом деле список значимых изменений, которые были представлены в Spark 4.0, очень большой и заслуживает отдельной статьи. Собственно, такую статью мы собираемся представить в скором времени, где более подробно остановимся на нововведениях.
Можно ли построить LakeHouse без Spark или только на нем?
Если очень захотеть, то, конечно же, можно обойтись без Spark при построении Data Lakehouse. Но в этом случае вам придется закрывать задачи, которые Spark умеет делать из коробки, целым зоопарком альтернативных решений, так как ни один базовый движок отдельно не решит их.
Безусловно, Spark обладает широкой функциональностью и крутыми отличительными особенностями, которые делают его надежным и производительным движком, необходимым для работы Data Lakehouse. Однако есть кейсы, где другие движки показывают себя в работе намного лучше, чем Spark.
В частности здесь речь о выполнении интерактивных ad-hoc запросов, результаты которых должны возвращаться пользователю как можно быстрее. И здесь какие-нибудь Impala/StarRocks оставляют Spark по скорости работы далеко позади, ведь они специально проектировались для таких задач (обработка всех данных в оперативной памяти, перенос операций фильтрации и проецирования ближе к источнику данных, распараллеливание расчетов и пр.). Если говорить про Trino, то его возможность выполнять федеративные запросы (когда в пределах одного скрипта мы можем работать сразу со многими БД) – это явное преимущество для работы аналитиков. Ведь действительно удобно, когда в одном месте можешь обращаться сразу ко множеству источников без лишней головной боли.
Таким образом, Spark является необходимой частью Lakehouse-платформ и открывает двери в мир надежных ETL-процессов, интеграций данных и широкого спектра доступных операций над данными. Но не следует забывать: чтобы достичь максимально эффективного решения разнородных задач при работе с данными, стоит брать лучшее из мира движков. Именно поэтому мы в решении Data Ocean Nova не заставляем использовать определенный движок для расчетов данных. При этом в нашей платформе Spark присутствует как обязательный движок для обслуживания Iceberg и дефолтный движок для инжестинга данных, который мы регулярно самостоятельно дорабатываем (с одним из примеров наших доработок вы можете ознакомиться в статье моих коллег «Оптимизации функционала Apache Iceberg в задачах real-time загрузки и обработки данных»). А следить за обновлениями нашей платформы можно, подписавшись на Telegram-канал Data Sapience.
