
Всем привет!
Хочу поделиться нашим опытом использования Apache Iceberg 1.8+ с каталогом AWS Glue и Spark 3.5.
Расскажу:
С какими проблемами мы столкнулись;
Почему compaction внезапно перестал помогать;
Как мы чинили compaction.
Спойлер: будет боль, delete-файлы, конфликты транзакций и немного Git-подобной магии 😄
Исходные условия у нас были следующими:
Сжатые сроки — нужно было, чтобы всё заработало «ещё вчера».
Сотни таблиц Iceberg, все — v2.
AWS Glue compaction и оптимизации не дают желаемого эффекта и производительности. При этом мы не видели ошибок, даже если compaction завершался с транзакционными конфликтами.
Часть сущностей активно обновляется, часть — почти статична.
Есть стриминговый поток, который пишет новые данные каждые 3–5 минут.
Проблема №1. Delete-файлы съели всё
Iceberg таблицы v2 — это row-level changes. То есть поддерживаются изменения данных на уровне отдельных строк (обновление, удаление одной или нескольких записей), а не целых файлов или партиций.
В результате таких обновлений у вас появляются дополнительные delete-файлы, которые привязаны к каждому data-файлу:
equality delete files - «удалить все строки, где столбец(ы) = значение» — в delete file лежат сами ключи (например id = 123).
position delete files - «удалить строку № N в файле File1» — путь к data-файлу + номер строки (и при необходимости сама строка для проверки). Но при этом у вас основной стриминговый поток только вставляет данные, не удаляет.
В результате такой архитектуры получается, что на один data-файл может приходиться N delete-файлов.
Когда Spark читает сущность, он должен прочитать все data-файлы и все delete-файлы, причём один delete-файл может быть прочитан X раз. А если к одному data-файлу привязано много delete-файлов, экзекьютору придётся держать их все в памяти — OOM в таком случае более чем вероятен.😉
Как обнаружить, что у вас плохое состояние сущности?
Через метрики Iceberg.
Можно настроить мониторинг в Grafana. Iceberg позволяет делать scan сущности и возвращать её текущее состояние. Более того, он логирует отчёт о состоянии таблицы при обращении к ней через Spark:
INFO LoggingMetricsReporter: Received metrics report: ScanReport{tableName=iceberg.***, snapshotId=4911325398790636705, filter=(ref(name="col") >= "(timestamp-1-hours-ago)" and ref(name="col") <= "(timestamp-0-hours-ago)"), schemaId=1, projectedFieldIds=[*], projectedFieldNames=[*], scanMetrics=ScanMetricsResult{totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.128236656S, count=1}, resultDataFiles=CounterResult{unit=COUNT, value=4}, resultDeleteFiles=CounterResult{unit=COUNT, value=105}, totalDataManifests=CounterResult{unit=COUNT, value=23}, totalDeleteManifests=CounterResult{unit=COUNT, value=8}, scannedDataManifests=CounterResult{unit=COUNT, value=23}, skippedDataManifests=CounterResult{unit=COUNT, value=0}, totalFileSizeInBytes=CounterResult{unit=BYTES, value=600614244}, totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=2717982}, skippedDataFiles=CounterResult{unit=COUNT, value=91}, skippedDeleteFiles=CounterResult{unit=COUNT, value=0}, scannedDeleteManifests=CounterResult{unit=COUNT, value=8}, skippedDeleteManifests=CounterResult{unit=COUNT, value=0}, indexedDeleteFiles=CounterResult{unit=COUNT, value=256}, equalityDeleteFiles=CounterResult{unit=COUNT, value=228}, positionalDeleteFiles=CounterResult{unit=COUNT, value=28}, dvs=CounterResult{unit=COUNT, value=0}}, metadata={engine-version=3.5.5-amzn-1, iceberg-version=Apache Iceberg unknown (commit unknown), app-id=application_1769507792449_5258, engine-name=spark}}
resultDeleteFiles = 105 - именно столько будет прочитано delete-файлов.
resultDataFiles = 4 - и столько data файлов.
Через Spark UI.
Можно зайти в Spark UI и увидеть там план запроса, например:

Когда Spark читает Iceberg-таблицу, он делает BatchScan — сканирует текущее состояние таблицы.
Time spent on file scans: 1,423,415 ms
Если это число большое — таблица уже в критическом состоянии.
1 423 415 мс = ~23.7 минуты
❗ И это время только на чтение файлов, без выполнения самого запроса.
Почему так долго? Ответ — в delete-файлах. Spark читает: data-файлы + delete-файлы.
Почему compaction не спасал?
Когда мы начинали разработку, у нас был EMR, который поддерживал только Iceberg 1.6.
В этой версии delete-файлы удаляются во время compaction только если:
compaction идет с опцией
rewrite-all = trueи по всей истории сущности.
Конечно есть другие оптимизации Iceberg: expire snapshots, rewrite position delete-файлы и тп, которые тоже удаляют delete-файлы, но делают это не так эффективно.
Бонусом у нас были постоянные конфликты транзакций между основным потоком и compaction, и просто внутри групп compaction.
Проблема №2. Конфликты транзакций и падающий compaction
Мы использовали при работе compaction такие опции, как :
partial-progress.enabled = truepartial-progress.max-commits = 200retry зафэйлинных групп/целого compaction с задержками
Типичные ошибки выглядели так:
ValidationException: Cannot commit, found new position delete for replaced data file
типичная ошибка конфликта транзакций.
Comparison method violates its general contract!
Последняя ошибка прилетела к нам из EMR. Она решалась настройкой spark: spark.sql.iceberg.data-prefetch.enabled=false
Как стали решать проблему?
Апгрейд до Iceberg 1.8 - это максимальная версия для нашей версии EMR. После обновления стало заметно лучше:
Delete-файлы стали хорошо уходить приrewrite-all = true, как с фильтром по колонке, так и по всей истории.Мы стали использовать механизм branch в Iceberg.
Основной поток пишет в
mainветку.Compaction выполняется в отдельной ветке
После завершения compaction делает попытку merge операции по стратегии fast-forward в
main. При таком подходе конфликты с группами почти сошли на нет.
Почему именно fast-forward?
Есть ещё стратегия replaced — при ней конфликтов merge не будет вовсе. Но она может привести к потере данных, так как вы просто отдаёте приоритет результату compaction и затираете всё, что произошло в main после создания ветки.
Как вы уже догадались, при стратегии fast-forward конфликты возможны, если в main что-то изменилось за время выполнения джоба.
Нам пришлось идти с бизнесом на компромиссы:
Для часто обновляющихся сущностей делать паузу в работе основного потока. Но делать это всегда, для всех сущностей или для compaction по всей истории большой таблицы, мы не можем.
В итоге мы:
добавили множество retry;
увеличили max-concurrent-file-group-rewrites и дали больше ресурсов Spark, что существенно ускорило работу compaction;
разбили compaction по периодам истории, чтобы минимизировать конфликты.
сделали возможность остановки основного стримингового потока.
После этого система стала:
стабильной
предсказуемой
и перестала пугать 20+ минутами чтения 😅
Выводы
Iceberg очень быстро развивается, поэтому лучше делаете разработку сразу на максимальной доступной для вас версии.
Iceberg v2 — мощный, но требует внимания.
delete-файлы — скрытый враг.
compaction - подвержен множеству конфликтов. Не терпит параллельной работы с сущностью.
Надеюсь, этот опыт будет полезен тем, кто только начинает или уже тонет так же, как и мы когда-то 🙂
