Iceberg
Iceberg

Всем привет!
Хочу поделиться нашим опытом использования Apache Iceberg 1.8+ с каталогом AWS Glue и Spark 3.5.

Расскажу:

  • С какими проблемами мы столкнулись;

  • Почему compaction внезапно перестал помогать;

  • Как мы чинили compaction.

Спойлер: будет боль, delete-файлы, конфликты транзакций и немного Git-подобной магии 😄

Исходные условия у нас были следующими:

  • Сжатые сроки — нужно было, чтобы всё заработало «ещё вчера».

  • Сотни таблиц Iceberg, все — v2.

  • AWS Glue compaction и оптимизации не дают желаемого эффекта и производительности. При этом мы не видели ошибок, даже если compaction завершался с транзакционными конфликтами.

  • Часть сущностей активно обновляется, часть — почти статична.

  • Есть стриминговый поток, который пишет новые данные каждые 3–5 минут.

Проблема №1. Delete-файлы съели всё

Iceberg таблицы v2 — это row-level changes. То есть поддерживаются изменения данных на уровне отдельных строк (обновление, удаление одной или нескольких записей), а не целых файлов или партиций.
В результате таких обновлений у вас появляются дополнительные delete-файлы, которые привязаны к каждому data-файлу:

  • equality delete files - «удалить все строки, где столбец(ы) = значение» — в delete file лежат сами ключи (например id = 123).

  • position delete files - «удалить строку № N в файле File1» — путь к data-файлу + номер строки (и при необходимости сама строка для проверки). Но при этом у вас основной стриминговый поток только вставляет данные, не удаляет.

В результате такой архитектуры получается, что на один data-файл может приходиться N delete-файлов.
Когда Spark читает сущность, он должен прочитать все data-файлы и все delete-файлы, причём один delete-файл может быть прочитан X раз. А если к одному data-файлу привязано много delete-файлов, экзекьютору придётся держать их все в памяти — OOM в таком случае более чем вероятен.😉

Как обнаружить, что у вас плохое состояние сущности?

  1. Через метрики Iceberg.

Можно настроить мониторинг в Grafana. Iceberg позволяет делать scan сущности и возвращать её текущее состояние. Более того, он логирует отчёт о состоянии таблицы при обращении к ней через Spark:

INFO LoggingMetricsReporter: Received metrics report: ScanReport{tableName=iceberg.***, snapshotId=4911325398790636705, filter=(ref(name="col") >= "(timestamp-1-hours-ago)" and ref(name="col") <= "(timestamp-0-hours-ago)"), schemaId=1, projectedFieldIds=[*], projectedFieldNames=[*], scanMetrics=ScanMetricsResult{totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.128236656S, count=1}, resultDataFiles=CounterResult{unit=COUNT, value=4}, resultDeleteFiles=CounterResult{unit=COUNT, value=105}, totalDataManifests=CounterResult{unit=COUNT, value=23}, totalDeleteManifests=CounterResult{unit=COUNT, value=8}, scannedDataManifests=CounterResult{unit=COUNT, value=23}, skippedDataManifests=CounterResult{unit=COUNT, value=0}, totalFileSizeInBytes=CounterResult{unit=BYTES, value=600614244}, totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=2717982}, skippedDataFiles=CounterResult{unit=COUNT, value=91}, skippedDeleteFiles=CounterResult{unit=COUNT, value=0}, scannedDeleteManifests=CounterResult{unit=COUNT, value=8}, skippedDeleteManifests=CounterResult{unit=COUNT, value=0}, indexedDeleteFiles=CounterResult{unit=COUNT, value=256}, equalityDeleteFiles=CounterResult{unit=COUNT, value=228}, positionalDeleteFiles=CounterResult{unit=COUNT, value=28}, dvs=CounterResult{unit=COUNT, value=0}}, metadata={engine-version=3.5.5-amzn-1, iceberg-version=Apache Iceberg unknown (commit unknown), app-id=application_1769507792449_5258, engine-name=spark}}

resultDeleteFiles = 105 - именно столько будет прочитано delete-файлов.
resultDataFiles = 4 - и столько data файлов.

  1. Через Spark UI.

Можно зайти в Spark UI и увидеть там план запроса, например:

BatchScan в Spark UI
BatchScan в Spark UI

Когда Spark читает Iceberg-таблицу, он делает BatchScan — сканирует текущее состояние таблицы.

Time spent on file scans: 1,423,415 ms

Если это число большое — таблица уже в критическом состоянии.

1 423 415 мс = ~23.7 минуты

❗ И это время только на чтение файлов, без выполнения самого запроса.

Почему так долго? Ответ — в delete-файлах. Spark читает: data-файлы + delete-файлы.

Почему compaction не спасал?

Когда мы начинали разработку, у нас был EMR, который поддерживал только Iceberg 1.6.
В этой версии delete-файлы удаляются во время compaction только если:

  • compaction идет с опциейrewrite-all = true и по всей истории сущности.

Конечно есть другие оптимизации Iceberg: expire snapshots, rewrite position delete-файлы и тп, которые тоже удаляют delete-файлы, но делают это не так эффективно.
Бонусом у нас были постоянные конфликты транзакций между основным потоком и compaction, и просто внутри групп compaction.

Проблема №2. Конфликты транзакций и падающий compaction

Мы использовали при работе compaction такие опции, как :

  • partial-progress.enabled = true

  • partial-progress.max-commits = 200

  • retry зафэйлинных групп/целого compaction с задержками

Типичные ошибки выглядели так:

ValidationException: Cannot commit, found new position delete for replaced data file
  • типичная ошибка конфликта транзакций.

Comparison method violates its general contract!

Последняя ошибка прилетела к нам из EMR. Она решалась настройкой spark: spark.sql.iceberg.data-prefetch.enabled=false

Как стали решать проблему?

  1. Апгрейд до Iceberg 1.8 - это максимальная версия для нашей версии EMR. После обновления стало заметно лучше:
    Delete-файлы стали хорошо уходить при rewrite-all = true, как с фильтром по колонке, так и по всей истории.

  2. Мы стали использовать механизм branch в Iceberg.

  • Основной поток пишет в main ветку.

  • Compaction выполняется в отдельной ветке

  • После завершения compaction делает попытку merge операции по стратегии fast-forward в main. При таком подходе конфликты с группами почти сошли на нет.

Почему именно fast-forward?
Есть ещё стратегия replaced — при ней конфликтов merge не будет вовсе. Но она может привести к потере данных, так как вы просто отдаёте приоритет результату compaction и затираете всё, что произошло в main после создания ветки.

Как вы уже догадались, при стратегии fast-forward конфликты возможны, если в main что-то изменилось за время выполнения джоба.

Нам пришлось идти с бизнесом на компромиссы:
Для часто обновляющихся сущностей делать паузу в работе основного потока. Но делать это всегда, для всех сущностей или для compaction по всей истории большой таблицы, мы не можем.

В итоге мы:

  • добавили множество retry;

  • увеличили max-concurrent-file-group-rewrites и дали больше ресурсов Spark, что существенно ускорило работу compaction;

  • разбили compaction по периодам истории, чтобы минимизировать конфликты.

  • сделали возможность остановки основного стримингового потока.

После этого система стала:

  • стабильной

  • предсказуемой

  • и перестала пугать 20+ минутами чтения 😅

Выводы

Iceberg очень быстро развивается, поэтому лучше делаете разработку сразу на максимальной доступной для вас версии.

  • Iceberg v2 — мощный, но требует внимания.

  • delete-файлы — скрытый враг.

  • compaction - подвержен множеству конфликтов. Не терпит параллельной работы с сущностью.

Надеюсь, этот опыт будет полезен тем, кто только начинает или уже тонет так же, как и мы когда-то 🙂