Привет, Хабр. Меня зовут Василий Мельник, я product owner решения для потоковой обработки данных Data Ocean SDI в компании Data Sapience. Наша команда приобрела большой практический опыт работы с Apache Iceberg в задачах на стыке традиционной пакетной обработки и near real-time и конкретно с использованием технологий на базе Flink, поэтому мы не могли пройти мимо нового открытого табличного формата (OTF) Paimon от разработчиков Apache Flink. В этой статье я опишу наш опыт и те практические выводы, которые мы сделали на промышленных средах, в виде репрезентативного тестирования, на котором проиллюстрирую ключевые практические сценарии.
Скрытый текст
Для тех, кто хочет выводы сразу: Paimon поможет в определенных сценариях, но массово переходить на него пока рано. Подробнее тут

Предисловие: болевые точки OTF в Data Lake
Появление open table formats исполнило вековую мечту data-инженеров: совместило эффективность хранения и чтения Apache Parquet с возможностью обновления данных без полной их перезаписи. Достигается это за счет парадигмы Merge-On-Read и «отложенного удаления», когда информация об удалении старых версий записи пишется в deletion-файлы. Для фреймворков потоковой обработки, например Flink, это открывает возможности по обновлению данных прямо в Data Lake в режиме, близком к реальному времени, а для движков пакетной обработки — Spark, Impala, Trino, StarRocks — сокращает расход ресурсов на MERGE новых порций данных в витрины.
Цена такого новшества очевидна из названия подхода Merge-On-Read: затраты на применение удалений и вставок перекладываются на этап чтения данных, из чего вытекают следствия:
Чтение из OTF-таблиц с каждым обновлением деградирует;
OTF-таблицы требуют периодического обслуживания для слияния deletion-файлов и файлов данных.
Iceberg это касается в полной мере, чему посвящена статья моих коллег, которые детально описывают подходы к обслуживанию таблиц и их видение оптимального решения данной задачи.
Поэтому мы решили сравнить Paimon и Iceberg по следующим показателям:
Скорость merge новой порции данных:
через Apache Spark;
через Apache Flink;
Скорость scan таблиц через Apache Spark.
А также проследить деградацию этих параметров через несколько итераций в условиях отсутствия обслуживания. По нашему мнению, это типовые сценарии и инструменты, с к��торыми большинство data-инженеров сталкиваются на практике.
Особенности Paimon в сравнении с Iceberg
При чтении Spark-ом Iceberg-таблиц на каждый файл данных поднимается task, который читает сам файл и связанные с ним deletion-файлы. Для Iceberg это единственная реализация чтения в Spark.
Paimon «пошел другим путем» и представил 2 различающихся типа таблиц:
Merge-On-Write — несмотря на название, это полный аналог Merge‑on‑Read Iceberg (важно не запутаться!): удаления пишутся в deletion‑файлы, scan таблицы параллелится по файлам данных;
Merge‑On‑Read — не имеет прямого аналога в Iceberg. В этом режиме таблица Paimon обязана быть бакетирована. Бакетирование (=hash‑партицирование) присутствует и в Iceberg, но представляет собой дополнительный инструмент для увеличения селективности выборки по ключу. Роль бакетов в Paimon принципиально другая: каждый бакет — отдельное LSM‑дерево: отсутствуют выделенные deletion-файлы, файлы данных разделены на уровни, при чтении видимое клиенту значение ключа берется из наиболее «свежего» файла. Особенность видна сразу: параллелизм чтения таких таблиц ограничен количеством бакетов.
Таким образом, в нашем тесте мы сравним 3 вида таблиц:
Iceberg Merge-On-Read;
Paimon Merge-On-Read;
Paimon Merge-On-Write.
Стенд тестирования
Для эксперимента используем инфраструктуру Yandex Cloud: Compute Cloud и Object Storage. Таблицы Iceberg и Paimon будем хранить в S3 с использованием файлового каталога, 2 виртуальные машины по 16 vCPU и 64 GB RAM задействуем для развертывания кластеров Flink и Spark (диски на виртуальных машинах локальные).

Для простоты поднимем кластеры в максимально простой конфигурации: 1 master-процесс (Spark Master и Flink Job Manager) и 1 worker-процесс (Spark Executor и Flink Task Manager) на каждом, Worker-ам доступны все ресурсы виртуальной машины. Для Spark это имеет дополнительный смысл в оптимизации чтения equality‑delete файлов (см. далее) — при большем количестве Spark Executors с меньшим количеством ресурсов на каждом эффективность чтения будет значительно ниже.
Версии программных компонентов:
Spark 3.5.4;
Flink 1.20;
Paimon 1.10;
Iceberg 1.10 и 1.8 Nova edition.
Схема данных
Таблицы Paimon имеют следующие схемы:
Merge-On-Write
CREATE TABLE TableMOW ( ID BIGINT, VAL_S STRING, VAL_TS TIMESTAMP, VAL_DT DATE, VAL_DEC DECIMAL(38,12) ) TBLPROPERTIES ( 'deletion-vectors.enabled' = 'true', 'bucket'=4, 'primary-key' = 'ID'); Merge-On-Read CREATE TABLE TableMOR ( ID BIGINT, VAL_S STRING, VAL_TS TIMESTAMP, VAL_DT DATE, VAL_DEC DECIMAL(38,12) ) TBLPROPERTIES ( 'primary-key' = 'ID', 'write-only'='true', 'bucket'=4);Схема таблицы Iceberg:
CREATE TABLE TableIcebergMORv3 ( ID BIGINT, BUCKET_ID INT, VAL_S STRING, VAL_TS TIMESTAMP, VAL_DT DATE, VAL_DEC DECIMAL(38,12), PRIMARY KEY(ID) NOT ENFORCED ) WITH ( 'write.format.default' = 'parquet', 'write.delete.mode' = 'merge-on-read', 'write.update.mode' = 'merge-on-read', 'write.merge.mode' = 'merge-on-read', 'write.upsert.enabled'='true', 'format-version'='3');
Таблицы заполнены 10 000 000 случайно сгенерированных записей с уникальными значениями ID и размером записи в 1 KB, после чего выполнена процедура компакции parquet-файлов до целевого размера 128 MB. В итоге средний размер каждой таблицы в S3 получился ~ 7 GB со средним количеством data-файлов ~60.
Количество бакетов для Paimon изначально выбрано так, чтобы быть значительно меньше доступного количества CPU на кластере Spark (16 vCPU), при этом +- попадает в рекомендации Paimon по размеру бакета до 1 GB:
Скрытый текст
https://paimon.apache.org/docs/master/primary-key-table/overview/
A bucket is the smallest storage unit for reads and writes, so the number of buckets limits the maximum processing parallelism. This number should not be too big, though, as it will result in lots of small files and low read performance. In general, the recommended data size in each bucket is about 200MB — 1GB)
Для Iceberg Merge-On-Read мы не задавали партицирование, как не задавали бы его для таблиц такого объема в реальных задачах.
Тестирование Apache Spark
Для проведения теста Spark мы сгенерировали 21 набор по 1 000 000 случайных строк размером 1 KB каждая, идентичных по схеме целевым таблицам, с диапазоном ID от 1 до 10 000 000. Далее последовательно выполняем MERGE каждого набора строк по простому равенству ID (пример для первого набора):
MERGE INTO TableIcebergMOR target
USING Source_1_1000000 source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *После каждого MERGE выполняем чтение таблицы и замеряем время, затраченное на SCAN. Так как после выполненных операций количество записей в таблице не растет из-за полного пересечения набора ID, мы получим чистое влияние процесса слияния на скорость чтения.
Результаты измерений
На графиках представлена длительность MERGE и SCAN после каждой итерации последовательно для 3-х исследуемых типов таблиц:



Из графиков видим следующее:
Paimon Merge-On-Read — очевидно в аутсайдерах. Как и ожидали, самый медленный SCAN и быстрая деградация. Но удивительно и неожиданно, что при MERGE также сканируется целевая таблица, поэтому деградация SCAN влечет за собой деградацию MERGE. Казалось бы, при использовании LSM для MERG по ключу вообще нет необходимости в SCAN целевой таблицы (при записи в Paimon через Flink его не происходит, например), однако же Spark его делает;
При сравнении Paimon Merge-On-Write и Iceberg Merge-On-Read (напомню, они имеют практически эквивалентную структуру хранения) ситуация чуть сложнее:
На 16-й итерации Paimon сделал автокомпакцию при выполнении MERGE – видно по всплеску на гистограмме.
До автокомпакции оба формата демонстрировали примерно одинаковую скорость scan с трендом на деградацию;
Но мы вновь наблюдаем МНОГОКРАТНОЕ отставание Paimon в скорости MERGE. Разгадка — в следующем:
Обоим форматам для формирования deletion‑векторов требуется получить из целевой таблицы позицию удаляемой строки в файле данных;
Iceberg делает это оптимально, при скане целевой таблицы пользуется колоночным хранением в parquet и читает только колонку ID, по которой идет MERGE, поэтому размер читаемых данных, который мы может увидеть в плане запроса, невелик;

Рис.5 Shuffle данных для MERGE в Iceberg Paimon же читает таблицу целиком .

Рис.6 Shuffle данных для MERGE в Paimon
Стоит обязательно учитывать данное поведение Paimon при разработке ETL-процессов. Мы добавили в бэклог R&D команды задачу для дальнейшего изучения кода Paimon в этой части.
Краткие выводы по Spark
Для себя мы сделали следующие выводы:
Paimon в Merge-on-Read непригоден для практических задач, т. к. имеет ограничения на scan и стремительно деградирует;
Paimon Merge on Write не превосходит Iceberg Merge on Read по чтению и сильно уступает в скорости MERGE;
В итоге ценность Paimon для пакетной обработки на Apache Spark сомнительна.
Тестирование Apache Flink
Для тестирования Flink мы создаем синтетический поток в 2 000 записей в секунду, при этом используем довольно стрессовый сценарий для NRT ODS — 70% записей составляют операции обновления существующего ключа (которые перед вставкой распадаются на пары RowKind.UPDATE_BEFORE и RowKind.UPDATE_AFTER), а 30% — операции вставки новых ключей. Интервал чекпоинта — 5 минут. После каждого чекпоинта с помощью Spark выполняем чтение таблицы и замеряем скорость SCAN. Тем самым моделируем ситуацию, хорошо знакомую пользователям Lakehouse-платформы: Flink пишет данные в Near Real Time (Kafka, CDC), которые потом будут прочитаны через Spark/SQL-движки c ожиданиями консистентности и предсказуемого времени работы.
Также отмечу, что для Paimon будем писать только в таблицу Merge-On-Read. Причина в том, что при записи в Merge-On-Write Flink безальтернативно делает компакцию целевой таблицы после каждого чекпоинта с остановкой обработки потока данных на минуты. Для реальных сценариев такое поведение мы посчитали неприемлемым.
Результаты измерений


Здесь мы наблюдаем совершенно другую картину. Paimon работает стабильно, не показывая видимой деградации, в то время как Iceberg деградирует стремительно и уже на 11 чекпоинте чтение начинает выдавать OutOfMemory (отсутствующие столбцы SCAN на гистограмме).
Проблема эта известна и связана с крайней неоптимальностью реализации equality delete в open source Iceberg. Проверим, как будет работать связка Spark + Iceberg с оптимизационными доработками, разработанными командой Data Sapience.

Здесь нам на руку сыграло использование одного Spark executor: переход к кэшированию на уровне executor, а не на уровне task, свел деградацию к минимуму. Мало того, по абсолютным цифрам, за счет отсутствия ограничений на параллелизм SCAN, пропатченный Iceberg теперь выигрывает у Paimon.
Краткие выводы по OTF
Для себя сделали следующие выводы:
Paimon превосходит open source Iceberg для кейса Real-Time записи данных в Data Lake, если у вас есть ощутимая доля UPDATE по PK (>30% в масштабе всей таблицы, а не отдельного инкремента)
Доработки, примененные в Lakehouse-платформе Data Ocean Nova, эффективно решают эту проблему и сводят на нет преимущества архитектуры Paimon относительно Iceberg. Поэтому в проектах с платформой мы осознанно используем Iceberg в этих сценариях.
В целом, на текущий момент Paimon имеет смысл рассматривать для использования только для первичного RAW / Landing слоя и не использовать для слоев Lakehouse выше RAW — далее все равно необходима перекладка в Iceberg.
Несомненно удобнее использовать один формат во всех слоях, но без решения проблем OSS версии Iceberg и функционала managed compaction для Lakehouse при наличии Near Real Time streaming c количеством объектов 300+ и размером хранилища 50Тб+ через пару недель после начала ОПЭ все ETL-процессы остановятся из-за огромных ресурсозатрат кластера на чтение таблиц. Paimon MOR может снять для вас эту боль только на первичном этапе.
Заключение
Безусловно, Paimon — молодой формат с богатым набором настроек и пока достаточно слабым cookbook. Какие-то его особенности заложены архитектурно, какие‑то — могут быть поправлены в ближайшее время, а где-то мы могли недокрутить. Пока, с учетом накопленной экспертизы по Iceberg и собственных доработок, мы не спешим внедрять Paimon в проектах Data Lake, но пристально наблюдаем за его развитием. Следить за обновлениями можно, подписавшись на телеграмм-канал Data Sapience.
