Iceberg становится де-факто отраслевым стандартом при построении lakehouse в России. Для сравнения, на последней конференции smart-data, Iceberg по частоте упоминания уступает только Spark. Это значит, что уверенное владение механикой работы Iceberg становится обязательным навыком для инженеров данных и платформенных команд. Однако на практике большинство команд при внедрении ограничиваются базовыми возможностями:

  • создание таблиц

  • настройка партиционирования

  • настройка сompaction-процедур

При этом значительная часть производительности и стоимости эксплуатации Iceberg таблиц определяется менее очевидными деталями: устройством метаданных, стратегиями записи файлов и тем, как движки выполнения используют статистики файлов. Эти аспекты редко оказываются в центре внимания, но именно они часто становятся причиной деградации производительности по мере роста таблиц. На деле же пространство оптимизаций гораздо шире.

В этой статье я разберу несколько достаточно простых, но эффективных оптимизаций Iceberg таблиц, про которые часто забывают. Однако, они помогают:

  • ускорить аналитические запросы без изменения самих запросов;

  • снизить количество читаемых файлов и метаданных;

  • уменьшить нагрузку на каталог и object storage;

  • стабилизировать производительность по мере роста таблиц.

Установка порядка записи

Работая с таблицами, мы привыкли, что порядок строк совершенно неважен. С точки зрения выполнения ETL, так оно и есть, для любой ETL нет никакой разницы, что будет раньше, строка с id = 1 или 2. Итог будет один и тот же.

Однако, одна из киллер фич Iceberg - сбор статистик, позволяющий отсекать ненужные data файлы на этапе чтения манифеста. При чтении данных, Iceberg активно использует min/max значения колонок в каждом manifest file. Если значения min/max находятся вне диапазона значений нашего фильтра - файл нет необходимости читать.

Рассмотрим случайный пример записи данных без порядка записи.

Для этого создадим таблицу

spark.sql("""
CREATE TABLE habr.paper.write_ordered (
  id INT,
  name STRING,
  event STRING
)
USING ICEBERG
""")

И попробуем сэмулировать типичную ETL с repartition.

data = [(i, f"name_{i}", f"event_{i}") for i in range(1, 1000000)]

df = spark.createDataFrame(data, ["id", "name", "event"]).repartition(16)

df.writeTo("habr.paper.write_ordered").append()

Если посмотреть на содержимое manifest file с помощью запроса:

spark.sql("""
SELECT readable_metrics.id.lower_bound AS id_min,
    readable_metrics.id.upper_bound AS id_max, file_path 
FROM habr.paper.write_ordered.files
""").show(truncate=False)

Мы увидим нечто подобное:

+------+------+-------------------------------------------------------------------------------------------------+
|id_min|id_max|file_path                                                                                        |
+------+------+-------------------------------------------------------------------------------------------------+
|2     |999994|s3://habr/paper/write_ordered/data/-9fb24ce9ff63-0-00001.parquet|
|43    |999988|s3://habr/paper/write_ordered/data/00001-267-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|4     |999997|s3://habr/paper/write_ordered/data/00002-268-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|8     |999961|s3://habr/paper/write_ordered/data/00003-269-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|1     |999944|s3://habr/paper/write_ordered/data/00004-270-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|25    |999983|s3://habr/paper/write_ordered/data/00005-271-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|39    |999991|s3://habr/paper/write_ordered/data/00006-272-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|23    |999998|s3://habr/paper/write_ordered/data/00007-273-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|33    |999985|s3://habr/paper/write_ordered/data/00008-274-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|10    |999999|s3://habr/paper/write_ordered/data/00009-275-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|15    |999990|s3://habr/paper/write_ordered/data/00010-276-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|40    |999986|s3://habr/paper/write_ordered/data/00011-277-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|35    |999996|s3://habr/paper/write_ordered/data/00012-278-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|13    |999951|s3://habr/paper/write_ordered/data/00013-279-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|3     |999981|s3://habr/paper/write_ordered/data/00014-280-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
|5     |999969|s3://habr/paper/write_ordered/data/00015-281-cc7ac11e-62e9-4f59-a2f6-9fb24ce9ff63-0-00001.parquet|
+------+------+-------------------------------------------------------------------------------------------------+

Как вы понимаете, делая любой запрос с фильтрацией по ID, с высокой долей вероятности мы прочитаем все файлы и вся магия Iceberg окажется бессильной.

Однако, Iceberg поддерживает сортировки при записи данных

spark.sql("""
ALTER TABLE habr.paper.write_ordered
WRITE ORDERED BY id
""")

Тогда, выполняя нашу "ETL" мы получим уже адекватный результат в метаданных

+------+------+-------------------------------------------------------------------------------------------------+
|id_min|id_max|file_path                                                                                        |
+------+------+-------------------------------------------------------------------------------------------------+
|1     |196479|s3://habr/paper/write_ordered/data/00000-245-e7a312b9-83fd-4805-9260-fb0a7cb1b5d4-0-00001.parquet|
|196480|395983|s3://habr/paper/write_ordered/data/00001-246-e7a312b9-83fd-4805-9260-fb0a7cb1b5d4-0-00001.parquet|
|395984|595114|s3://habr/paper/write_ordered/data/00002-247-e7a312b9-83fd-4805-9260-fb0a7cb1b5d4-0-00001.parquet|
|595115|793943|s3://habr/paper/write_ordered/data/00003-248-e7a312b9-83fd-4805-9260-fb0a7cb1b5d4-0-00001.parquet|
|793944|999999|s3://habr/paper/write_ordered/data/00004-249-e7a312b9-83fd-4805-9260-fb0a7cb1b5d4-0-00001.parquet|
+------+------+-------------------------------------------------------------------------------------------------+

За счет данной оптимизации, если мы читаем данные с фильтрацией по id < 196470, мы прочитаем 1 data файл, вместо всей таблицы, что положительно скажется на выполнении аналитических запросов.

Отключение расчета статистик

По умолчанию Iceberg собирает статистику для каждого столбца в каждом файле. Это помогает в data skipping, однако, стоит понимать что сбор статистик также занимает некоторое время.

Рассмотрим типичную плоскую таблицу в lakehouse хранилище.

spark.sql("""
CREATE TABLE habr.paper.typical_flat_table (
  id INT,
  name STRING,
  col_1 STRING,
  col_2 STRING,
  col_3 STRING,
  col_4 STRING,
  col_5 STRING,
  col_6 STRING,
  col_7 STRING,
  col_8 STRING
)
USING ICEBERG
""")

В ней 10 столбцов. Однако, аналитики для своих вычислений в реальности используют только первые 2 для отбора нужных данных. Соответсвенно, для 8 колонок считается бесполезная статистика.

Для того, чтобы исправить это недоразумение, необходимо настроить расчет статистик. Iceberg позволяет установить сбор метрик по умолчанию (для примера выбрал counts, но вы можете настраивать под себя, см параметры). Также рекомендую установить уровень сбора метрик для конкретного столбца. Так, для ID, который используется в запросах, я рекомендую ставить уровень full, а для колонок по которым не идет никакая фильтрация данных - выключать сбор статистик.

spark.sql("""
ALTER TABLE habr.paper.typical_flat_table SET TBLPROPERTIES (
    -- По умолчанию собираем только счетчики
    'write.metadata.metrics.default' = 'counts',
    -- Для критически важного столбца ID собираем полные границы
    'write.metadata.metrics.column.id' = 'full',
    -- Для неиспользуемого в фильтрации столбца col_1 не собираем статистики
    'write.metadata.metrics.column.col_1' = 'none'
)
""")

Векторизованное удаление

Многие пользователи Iceberg в курсе про Copy-on-Write (CoW) и Merge-on-Read (MoR) стратегии и ускоряют запись данных с помощью Merge-on-Read стратегии. Однако, это фактически перекладывание вычислительной нагрузку на читателя. В этот момент ключевым фактором производительности становится то, как именно движок вычитывает удаленные строки из основного массива данных.

Iceberg поддерживает 2 основных вида Delete файлов:

  • Equality delete - используется в streaming обработке. По сути, в delete файл просто записывается "правило удаления", которое применяется во время чтения данных.

  • Position delete - используется в батчевых ETL, delete файл содержит номера строк, которые нужно удалить.

Содержание Position delete файла:

+------------------------------------------+---+
|file_path                                 |pos|
+------------------------------------------+---+
|s3://habr/paper/mor_table/data/....parquet|873|
|s3://habr/paper/mor_table/data/....parquet|230|
|s3://habr/paper/mor_table/data/....parquet|992|
+------------------------------------------+---+

С внедрением Iceberg format v3, в спецификации произошли изменения. Во первых, Equality delete был признан Deprecated. А Position delete эволюционировал в Deletion vectors за счет внедрения Puffin файлов.

Что такое Deletion vector? Говоря по простому, это bitmap который за один такт применяется к нашему data файлу, что существенно оптимизирует чтение при MoR.

Таким образом, вместо того, чтобы применять тяжеловесный Join delete файла и data файла, вычислительный движок просто загружает битовую маску и применяет её. Всего одна логическая операция для вычислений.

Заключение

Переход на Apache Iceberg действительно дает ощутимый прирост производительности при обработке данных. Однако важно помнить: Iceberg - это не серебряная пуля, работающая идеально "из коробки". Максимальная эффективность достигается только через тонкую настройку параметров таблицы под конкретные задачи.

Буду рад обсудить в комментариях ваш опыт и неочевидные кейсы по оптимизации.