Тема преимуществ открытых табличных форматов при работе с озерами данных всё чаще поднимается в среде дата-инженеров. Предполагается, что их использование способно устранить недостатки популярного Apache Hive. Но так ли это на практике?

Меня зовут Иван Биленко, я инженер данных в команде дата-платформы Циан. В этой статье я хочу немного познакомить вас с процессами и стеком внутри нашей платформы, рассказать, почему мы решили попробовать Iceberg, с какими проблемами столкнулись при тестировании и какие преимущества Iceberg может дать тем, кто еще только задумывается о переходе. Дисклеймер: статья носит обзорный характер.


Описание стека и процессов с данными

Циан с точки зрения обработки данных прошел долгий путь. В 2001 году мы были обычной Excel-таблицей.

Сегодня же наша инфраструктура включает в себя DWH на базе Greenplum и Data Lake на основе Yandex S3.

Основные источники данных — это сайт и мобильные приложения, откуда информация поступает в Data Lake через Kafka — наш основной канал передачи данных. После этого данные распределяются по различным потребителям:

Аналитики получают данные через DWH для проведения исследований и анализа.

Дата-сайентисты используют данные из фича-стора для построения моделей машинного обучения.

Есть и другие потребители данных: например, разработчики, а также прочие системы, участвующие в процессе передачи данных. Но на этом этапе мы не будем углубляться в их роль. Остановимся на процессах вокруг Data Lake (на схеме A и B) и связанных задачах.

Боли в процессах и потребности

Процесс A: работа с данными через Kafka

На этом этапе мы обрабатываем более 400 таблиц, загружая и парся данные из Kafka по заранее заданным слоям. У себя внутри мы называем этот процесс раскладкой. Каждую неделю добавляются 2-3 новые таблицы. 

Основная задача нашей команды — обеспечить доступность и актуальность данных к назначенному сроку, а также предоставить бизнесу инструмент для самостоятельного управления раскладкой без необходимости привлекать дата-инженеров. 

Какие проблемы мы здесь встречаем:

  1. Ограниченные DDL-операции в Apache Hive: Например, у нас нет доступа к операциям с колонками, таким как ALTER TABLE RENAME, REPLACE, или DROP. Это создает сложности, поскольку источники данных меняют свои форматы, и нам приходится перезаписывать таблицы целиком, что усложняет автоматизацию раскладки.

  2. Отсутствие транзакций: Если на этапе перезаписи происходит ошибка, мы можем потерять данные. Чтобы избежать этого, мы используем костыли с временными таблицами, записывая данные сначала в них. Но эти костыли не решают проблему временной недоступности таблиц. Кроме того, нам не удается проводить операции с таблицами одновременно, что сильно замедляет процесс.

Процесс B: работа с фича-стором

Мы собираем около 40 фича-таблиц для нашего фича-стора. Возникает потребность в хранении исторических версий данных. Поскольку речь идет о таблицах, которые могут весить сотни гигабайт, нам нужен эффективный механизм Time Travel, чтобы избежать создания копий данных для разных версий.

Какое решение

В процессе поиска решения на горизонте сразу появились несколько популярных табличных форматов: Delta Lake, Apache Iceberg и Apache Hudi. Все они нацелены на расширение возможностей реляционных баз данных для Data Lake, что должно было полностью покрыть наши запросы. Что касается минусов перехода — на первый взгляд, кроме возможных сложностей с настройкой, их будто бы и нет. Если вы наткнулись на другую информацию, буду рад ссылке в комментариях к статье или в личном сообщении.

При выборе между табличными форматами Delta Lake сразу отпал, поскольку поддерживает только формат Parquet, а полноценное production ready решение с его полным функционалом доступно лишь в коммерческой версии Databricks. Нам же было нужно опенсорсное решение.

Из решений от Apache выбрали Iceberg благодаря его простой настройке — Yandex Cloud предлагает готовый набор файлов jar и подробный мануал, — и активному сообществу.

Сравнение форматов Iceberg и Hive

А теперь возьмём основные фишки Apache Iceberg, изучим, как они должны работать, и проверим их на практике.

Снэпшоты

После каждой операции над таблицей Iceberg фиксирует ее состояние в виде снимка и ведёт лог. Это даёт возможность перемещаться между снэпшотами, делать Time Travel запросы не только по id снимка, но и по заданному тегу. 

На практике это оказалось полезным: так можно фиксировать продовые версии датасетов. А вот функция разделения снэпшотов по веткам (например, для экспериментов, чтобы не трогать продовые данные) пока не пригодилась.

Производительность

Netflix, как создатель формата Apache Iceberg, делится впечатляющим кейсом его использования.

Данные

  • 2,7 млн файлов в 2688 партициях

Запрос

SELECT distinct tags['type'] as type 

FROM iceberg.atlas 

WHERE name = 'metric-name' 

AND date > 20180222 

AND date <= 20180228 

ORDER BY type;

Результат

EXPLAIN — 9 мин., query — не отрабатывает.

query — 42 сек. при условии min/max filtering.

Почему такой скачок в производительности?

Основной ответ — Iceberg оптимизирует запросы за счёт метаданных, которые позволяют фильтровать файлы, не соответствующие условиям запроса, исключая их из сканирования. 

Также Iceberg избавляет от необходимости листинга директо��ий по каждой партиции, ведь все метаданные хранятся отдельно.

Структура директории Iceberg таблицы
Структура директории Iceberg таблицы

Уровни метаданных

1. Для каждого дата-файла: статистики по колонкам хранятся в manifest files.

 select * from iceberg_catalog.feature_store.test.files
 select * from iceberg_catalog.feature_store.test.files

2. На уровне снэпшота таблицы: агрегированные статистики в manifest list.

select * from iceberg_catalog.feature_store.test.snapshots
select * from iceberg_catalog.feature_store.test.snapshots

Однако, на практике всё оказалось не так гладко.

Данные

  • 400 Гб, 31 тысяча файлов в 820 партициях, формат Parquet.

Условия

  • Движок Spark 3.1.2, Iceberg 0.13.1 и фиксированное количество ресурсов.

Запрос

SELECT count(*)

FROM iceberg_catalog.feature_store.user_search_history_features

Результат

EXPLAIN — 5 сек., query — 1 мин. 7 сек.

EXPLAIN — 58 сек., query — 4 мин. 32 сек.

Ожидалось, что подсчёт количества строк займет десятки секунд, но реальность оказалась другой. Возник вопрос: почему так?

Первое предположение: Iceberg не использует метаданные для оптимизации. Логично было бы взять информацию из summary с указанием total-records, но, глядя на план выполнения, видно, что таблица сначала полностью сканируется.

== Physical Plan ==

* HashAggregate (5)

+- Exchange (4)

   +- * HashAggregate (3)

      +- * Project (2)

         +- BatchScan (1)

Нет отдельного шага, где бы происходило чтение метаданных.

Второе предположение: слишком много метаданных, так как на каждую операцию создается новый файл. 

2823 — количество файлов метаданных в тестируемой таблице
2823 — количество файлов метаданных в тестируемой таблице

В документации действительно есть раздел Maintenance, где рекомендуется регулярно чистить метаданные. Но какое количество является допустимым для сохранения производительности — вопрос остаётся открытым.

Наличие транзакций

Транзакционная согласованность в Iceberg дает возможность параллельно выполнять операции записи без влияния на операции чтения. Проблема потери данных при перезаписи больше не актуальна. Также ушла ошибка Cannot overwrite a path that is also being read from — теперь не нужно сбрасывать план запроса или создавать временные таблицы, если пайплайн одновременно читает и записывает в одно и то же место.

Эволюция данных

Наша первая боль — ограниченные операции с колонками — решена благодаря возможности эволюции схемы данных без перезаписи. Теперь доступны операции Add, Drop, Rename, Update, Reorder. Возможность менять схему партиционирования (на текущий момент стабильно по дням) и порядка сортировки таблиц пока не использовали, но опция есть.

Расширенный SQL

Операцию DELETE пока не применяли, зато UPDATE используем регулярно в связке с INSERT через MERGE INTO. Важно помнить, что архитектурно в S3 сами файлы не изменяются и не удаляются — создаются дополнительные файлы, которые объединяются при чтении. Со временем это может сказаться на производительности.

Результаты и выводы

Мы начали частичную миграцию на Iceberg при создании нового процесса для сохранения исторических фичей в фича-сторе (процесс B). Нам удалось решить многие из упомянутых болей, но возникло снижение производительности на «жирных» таблицах, которое нужно проработать для полноценного перехода (процесс A). А полноценная миграция — тема для следующей статьи.