В этой статье речь пойдет не о напольных покрытиях, а о програмном продукте, более современном конкуренте Apache Parquet, продукте который изначально в 2014 году был разработан компанией Huawei как закрытое и проприетарное ПО, но в 2016 году был преобразован в открытый код и передан в управление Apache Software Foundation, где сейчас поддерживается и разрабатывается open-source сообществом. Речь идет о Apache CarbonData.
Итак, что такое Apache CarbonData?
Согласно официальному определению данному на главной странице продукта https://carbondata.apache.org Apache CarbonData — это индексированный колоночный формат данных, предназначенный для быстрого анализа на платформах больших данных, таких как Apache Hadoop, Apache Spark и других. Однако возникает вопрос: разве у нас уже нет других колоночных форматов данных, таких как Apache Parquet или Apache ORC? Зачем нам нужен еще один?
В этом контексте кажется, что с 2014 года Apache CarbonData значительно расширил список своих возможностей, выходя за рамки простого колоночного формата данных и превратившись в нечто гораздо более масштабное. Тем не менее, позиционирование продукта на официальной странице по-прежнему остается неизменным, что, возможно, является одной из причин, почему продукт не получает того внимания, на которое заслуживает.
Как так произошило так что CarbonData сейчас представляет собой высокопроизводительный движок хранения данных с собственным форматом файлов и движком запросов, поддерживающим индексирование и кэширование, интеграцией с Spark 3.1, с Hive, Presto (Trino), Flink, есть SDK для C++ и Java, а также предусмотрена интеграция с TensorFlow и PyTorch через модуль PyCarbon и по факту CarbonData является полноценным решением для data lake и высокопроизводительным единым хранилищем для big data и AI движков? Как так получилось? Нужно погружаться в детали.
Заглянем под капот
Такая эволюция, скорее всего, стала возможной благодаря тому, что продукт изначально создавался с идеей обеспечить улучшение и ускорение работы на каждом этапе: загрузки данных, их хранении, обработке и оптимизации запросов, анализе данных, масштабировании и управлении производительностью за счет возможности экспериментирования и добавления новых функций.
В основе CarbonData лежит колоночная организация данных: Группа записей в одном столбце называется страницей столбца, которая включает в себя как данные, так и метаданные: смещения, типы кодирования, минимальные и максимальные значения. Несколько таких страниц формируют блоклет, а набор блоклетов составляет блок, который фактически является файлом CarbonData.
Кроме этого были внедрены такие новшества которых не существовало на тот момент в других колончатых форматах, как хранение столбцов в отсортированном виде, использование различных методов сжатия данных, таких как Snappy, Gzip или Zstandard, сохранение минимальных и максимальных значений прямо рядом с данными, внедрение локального словаря в каждый блоклет. Также была добавлена поддержка адаптивного кодирования для динамической оптимизации объёма хранимых данных, возможность эволюции схемы путём добавления, удаления или переименования столбцов, а также поддержка всех основных и сложных вложенных типов данных.
Атомарность, консистентность, изолированность, стойкость
Изначально в момент дизайна системы было заложено требование полной поддержки ACID, и добавлены следующие функции отвечающие этим требованиям: возможность параллельного выполнения различных операций, множественные вставки данных, одновременное выполнение запросов на чтение во время обновления, удаления, объединения или архивации данных. Реализована снимков данных (snapshot isolation), что дает возможность одновременного доступа к данным через разные движки обработки (Apache Spark, Presto, Apache Flink).
Интеграция CarbonData с Spark достигается через стандартные механизмы расширения Spark SQL, включая специализированный парсер для работы с таблицами CarbonData и управлением данными (обновления, удаление, сжатие данных и т.д.). В систему добавлены алгоритмы оптимизации и планирования запросов, благодаря чему можно динамически изменять стратегию обработки запросов, в том числе использовать индексные таблицы или материализованные представления по мере необходимости.
Потоковые данные
Реализована поддержка Spark Streaming, с помощью которой возможно сохранение данных в реальном времени в строковом формате. На следующем этапе происходит трансформация в колоночный формат и индексирование. Поддержка операций обновления и удаления реализована следующим образом: поскольку файлы CarbonData являются неизменяемыми, обновленные данные будут записаны в новый файл, а идентификаторы удаленных строк будут сохранены в дельта-файле. Поскольку происходит добавление только дельта-файлов, нагрузка на систему ввода/вывода остаётся минимальной. Если сравнивать с режимом полной перезаписи файлов, то время, необходимое на обновление, уменьшается на 50% - 70%. За счет автоматического объединения дельта-файлов возможно избежать накопления большого количества мелких файлов.
Реализована поддержка Flink для потоковых данных реального времени при помощи CarbonData SDK.
Ключевым аспектом при работе с потоковыми данными в Flink является либо настройка интервала контрольной точки (checkpoint) в Flink, что позволяет гарантировать целостность данных в случае возникновения проблем. Это настраиваемое значение определяет интервал времени, про прошедствии которого для данных будет произведена запись, минимизируя потери в случае необходимости восстановления. Либо настройка объема данных, которая задает предельное число записей, после достижения которого начнется запись в файл Carbon.Top of Form
Дополнительная надежность обеспечивается за счет записи данных на диски еще до начала их загрузки. Это пзволяет сохранять данные в случае возникновения сетевых сбоев, тем самым гарантируя сохранность информации. Обеспечивается возможность запроса данных сразу после их записи. При этом построение индексов будет произведено после некоторой задержки.
Старый паркет не проблема!
Реализована поддержка одновременной работы с сегментами в различных форматах. Если по каким-то причинам возникает необходимость одновременной работы с несколькими внешними сегментами в других форматах, например, таких как файлы CSV, TXT, JSON, Parquet, ORC, то существует возможность избежать объединения, построить общий индекс и выполнять запросы, комбинируя данные из всех сегментов.
Индексы
В CarbonData реализовано множество механизмов индексации, включая грубые (coarse-grain) и тонкие (fine-grained) индексы, а также индексы минимальных и максимальных значений (min-max), которые могут применяться на уровне сегментов, файлов, блоклетов и страниц. Для столбцов, содержащих большое количество уникальных значений, может быть создан вторичный индекс в виде отдельной таблицы.
Например, если основной индекс построен на идентификаторе пользователя, поиск по номеру мобильного телефона может занять много времени. В таких случаях рекомендуется создать вторичный индекс для номеров мобильных телефонов. Если требуется, то для таблицы можно создать несколько вторичных индексов. Более того, для ускорения запросов к вторичным индексам можно использовать свои собственные индексы, такие как min-max.
Marterialized Views
CarbonData позволяет создавать материализованные представления (MV) для предварительной агрегации и объединения таблиц. Это обеспечивает значительное ускорение выполнения запросов, особенно в сценариях, где требуется агрегация и анализ данных.
На рисунке ниже показано, как таблицы T1 и T2 предварительно объединяются и сохраняются в виде MV для запроса < SELECT city, sum(value) FROM t1 JOIN t2 ON t1.id=t2.id GROUP BY city >
При выполнении запроса Carbondata сопоставляет план запроса с MV. Если часть или все данные MV совпадают с запросом, то они используются напрямую, что сокращает время выполнения запросов до 10 раз.
Carbondata обладает расширенной функциональностью для работы с временными рядами. В MV возможно хранить агрегированные данные по временным рядам для столбцов с датой и временем. Это позволяет ускорить выполнение запросов, требующих агрегации данных за различные периоды времени.
MV может хранить данные с различной гранулярностью (например, почасовая, минутная). При выполнении запроса Carbondata автоматически подберет наиболее подходящий уровень детализации, что оптимизирует использование ресурсов.
Carbondata поддерживает функцию свертки запросов. Если запрос требует более низкой гранулярности, чем та, которая хранится в MV, Carbondata автоматически агрегирует данные на лету. Для примера рассмотрим вариант в котором MV содержит данные о продажах с гранулярностью по минутам. Если пользователь выполняет запрос, требующий данных с почасовой гранулярностью, Carbondata автоматически агрегирует данные по минутам за каждый час, предоставляя результат без необходимости обращения к основной таблице.
Кроме этого Carbondata позволяет создавать MV из не-Carbondata таблиц например Parquet или ORK, что позволяет использовать преимущества MV для ускорения запросов к данным из различных источников. Но здесь имеются ограничения в части инкрементного обновлениея так как не-Carbondata таблицы не имеют концепции сегментов, поэтому инкрементное обновление MV для таких таблиц не поддерживается.
Индексация на Петабайтах
Изначально для ускорения поиска данных и уменьшения количества задач сканирования кеширование индексов производилось в драйвере Apache Spark. Однако при работе с петабайтными таблицами объем кеша становится огромным, и его хранение в драйвере перестало быть возможным.
Поэтому был разработан Сервис индексации. Это отдельное приложение Spark, которое хранит индексы в кеше исполнителя. Сервис индексации выполняет обрезку данных и отправляет результаты через RPC-вызов от драйвера сервиса индексации к драйверу Spark, инициировавшему запрос.
Обычно индекс строится во время первого запроса к таблице, но этот процесс занимает длительное время и зависит от объема данных. Поэтому для сокращения времени последующих запросов реализована автоматическая предварительное построение индекса после сохранения данных в базе.
Найди все точки!
Carbondata поддерживает работу с геопространственными данными благодаря возможности создания подключаемых индексов для столбцов, хранящих географическую долготу и широту.
По умолчанию испольтзуется Z-Order (или Morton Order) для полигональных запросов, например такких как «найди все точки расположенные в заданной области».
Записи, соответствующие запросу, выделены:
Latitude, Longitude |
|
|
|
|
|
|
0,0 |
| 0,1 |
| 0,2 |
| 0,3 |
1,0 |
| 1,1 |
| 1,2 |
| 1,3 |
2,0 |
| 2,1 |
| 2,2 |
| 2,3 |
3,0 |
| 3,1 |
| 3,2 |
| 3,3 |
|
|
|
|
|
| End |
Z-Order представляет собой метод преобразования многомерных координат в одномерный индекс, применяемую в основном для оптимизации хранения и поиска геопространственных данных. Основываясь на принципе преобразования каждого измерения (такого как широта и долгота) в двоичный формат и последующего чередования их битов для создания уникального двоичного кода, этот подход позволяет использовать полученный код как одномерный индекс. Такой метод обеспечивает эффективное хранение данных в одномерных массивах, ускоряет поиск информации по пространственным запросам и поддерживает пространственную близость объектов в индексе, что способствует оптимизации запросов на основе соседства.
Carbondata также оптимизирует обработку полигональных запросов путем переноса фильтрации на уровень сканирования и использования QuadTree индексирования для обработки только непосредственно релевантных блоклетов данных.
Структура данных QuadTree разделяет пространство на четыре квадранта, а затем рекурсивно делит каждый квадрант, который содержит слишком много объектов, на еще четыре подквадранта, и так далее, до тех пор, пока каждый квадрант не будет содержать приемлемое количество объектов или не будет достигнута определенная глубина дерева.. Например, на рисунке показано построение QuadTree для индексации квадратного пространства 4x4 .
Битва за байты
При сохранении данных каждой страницы столбца применяется алгоритм, который определяет наиболее компактный способ кодирования. Если все данные на странице могут быть сохранены, используя меньшее число бит, чем предусмотрено схемой, то Carbondata применяет оптимизацию.
Применяется два основных типа кодирования:
Адаптивное кодирование: Если тип данных столбца - long, но значения на странице укладываются в диапазон от 0 до 7 (byte), Carbondata может сохранить их, используя всего один байт. Это существенно сокращает объем хранимых данных.
Дельта-кодирование: Если в столбце хранятся большие числа, однако разница между минимальным и максимальным значением (min - max) на каждой странице укладывается в 8 байт (как в примере с C2), можно кодировать их как значения byte и смещения от min и max. Это позволяет сэкономить пространство по сравнению с хранением чисел полной длины.
C1: long |
0x00000002 |
0x00000007 |
0x00000009 |
0x00000003 |
0x00000005 |
0x00000007 |
В колонке C1 тип данных указан как 'long', который занимает 8 байт. Однако реальные значения, хранящиеся в этой колонке, находятся в диапазоне от 0x00000002 до 0x00000009, что подразумевает, что все они могут быть представлены в одном байте. Это достигается через использование адаптивного кодирования, при котором CarbonData оптимизирует хранение данных, используя только один байт вместо восьми. Такой подход позволяет существенно сократить объем используемого дискового пространства.
C2: long |
12892712 |
12892727 |
12892713 |
12892743 |
12892725 |
12892742 |
Для колонки C2 также используется тип 'long' и аналогично первой колонке, реальный размер данных уменьшен до одного байта за счет дельта-кодирования. В этом случае разница между минимальным и максимальным значением невелика (12892712 до 12892743), что позволяет хранить только разницу относительно максимального значения, существенно экономя тем самым пространство.
C3: double |
12.32 |
21.42 |
32.12 |
42.43 |
54.32 |
14.32 |
Третья колонка (C3) использует тип 'double', требующий 8 байт. Значения варьируются от 12.32 до 54.32, и используется масштабирование с коэффициентом два (scale: 2), что позволяет применять короткое целое число (short) на два байта для хранения каждого значения после умножения его на Math.pow(10, scale) для сохранения точности после запятой. Это также приводит к уменьшению занимаемого места по сравнению с традиционным способом хранения чисел с плавающей точкой.
CarbonData для ML и AI
Carbondata позволяет эффективно работать с большими объемами данных при построении моделей машинного обучения. Вот ключевые возможности:
Хранение бинарных данных: Carbondata поддерживает специальный бинарный тип столбца, позволяющий хранить изображения, BLOB-объекты и другие неструктурированные данные.
Быстрая фильтрация для подготовки датасетов: Carbondata обеспечивает быструю фильтрацию изображений и других бинарных данных для подготовки обучающих датасетов. Поддерживается хранение множества изображений в одном Carbon-файле для оптимизации скорости операций ввода-вывода.
После сохраниения данных в бинарном формате, CarbonData SDK позволяет заполнять векторы Apache Arrow данными из бинарных столбцов, а затем из таблицы строить DataFrame в Pandas и использовать их в TensorFlow, PyTorch, MXNet для обучения моделей.
PyCarbon – библиотека, оптимизирующая доступ к данным из CarbonData для AI-задач, основанная на проекте Petastorm от Uber.
Некоторые сценарии реального использования
Телекоммуникационной компании было необходимо анализировать историю звонков и измерений для выявления сбоев в обслуживании, а также использовать машинное обучение для прогнозирования потенциальных проблем, чтобы соответствовать требованиям SLA для VIP-клиентов. В качестве решения использовался кластер Hadoop + Spark + CarbonData, управляемый YARN, с оптимизированной конфигурацией для быстрой загрузки данных и обработки интерактивных запросов. Была достигнута скорость загрузки данных 40 МБ/с на узел и время ответа на индивидуальный запрос менее 3 секунд. Время ответа на множество одновременных запросов (20 запросов) составило менее 10 секунд.
Интернет-компания стремилась повысить скорость загрузки видео и улучшить сервис, анализируя данные о скорости интернета, моделях телефонов, используемых приложениях и популярных видео в различных регионах. Использование инструментов BI с группировкой GROUP BY приводило к замедлению системы и недовольству пользователей. Чтобы решить эту проблему, был сокращен размер блоков данных до 128 МБ, применена глобальная сортировка и материализованные представления, снизижено количество разделов в Spark и использован локальный словарь для уменьшения объема хранилища данных. В результате была значительно ускорена обработка запросов даже при интенсивном использовании BI-панелей.
В сценарии "Умный город" требовалось производить анализ данных движения людей и машин, объединенняя с внешними данными для глубокого изучения поведенческих паттернов. Итоговое решение было реализвано на кластере Hadoop + Spark + CarbonData, управляемому через YARN. Колонки с временными данными были размещенны в начале схемы, использованы блоки в 512 МБ и локальная сортировка. Это позволило достигнуть времени обработки запросов на сегменте мечее чем в 10 секунд, а скорость загрузки данных повысить до 45 МБ/с на каждом узле.
Банку и финансовой организации требовалось сохранение непрерывно поступающих потоковых данных и обработка этих данных почти в реальном времени. Проблема заключалась в том что загрузка данных микропакетами приводит к накоплению множества маленьких файлов, что снижает эффективность работы HDFS и ухудшает скорость обработки запросов. Решение нашлось в применении потоковых таблиц CarbonData, позволяющих увеличить размер сегмента для оптимизации запросов и обеспечить автоматическое преобразование сегментов в более удобный для анализа формат. Сжатие и использование локальных словарей позволили избежать проблем с маленькими файлами и обеспечить доступ к данным в режиме, близком к реальному времени.
Ну и самое вкусное – цифры по производительности, сравнения и бенчмарки
Были проведены сравнительные тесты производительности работы с изображениями при использовании CarbonData, хранением исходных изображений в формате JPG и форматом TFRecord. В качестве тестового набора был взят один гигабайт из датасета ImageNet.
Результаты показали что при работе с облачным хранилищем CarbonData быстрее в 10 раз производит чтение чем чтение из исходных JPG-изображений, быстрее чем TFRecord – в 6 раз. В среде локального хранилища, CarbonData показывает сопоставимую с TFRecord производительность при полном сканировании, но при этом в двадцать раз быстрее чем JPG-файлы. При выполнении запросов с фильтрацией CarbonData производит выборку из 1300 изображений из общего числа в 7800 изображений для формирования тестового датасета в шесть раз быстрее, чем с использованием TFRecord.
Результаты бенчмарков показывают что CarbonData быстрее в задачах ETL и Ad-hoc запросах, чем Spark на Parquet, Oracle ил Clickhouse это демонстрируется на сравнениии производительности при работе со стандартными наборами данных от Transaction Processing Performance Council TPC-H и TPC-DS https://www.tpc.org/. Данный бенчмарк состоит из 22 сложных SQL-запросов и модели данных, которая имитирует бизнес-среду. Запросы разработаны так, чтобы представлять реалистичные сценарии, отражая типы вопросов, которые могут возникать у бизнеса относительно тенденций продаж, поведения клиентов и изменений на рынке. В частности по бенчмарку TPC-H утверждается что CarbonData на 30% быстрее чем Parquet, по бенчмарку TPC-DS на 20% быстрее чем чем Parquet, на 47% быстрее чем Oracle и на 46% быстрее чем Clickhouse.
Надеюсь донес до вас какую-то новую информацию, заинтересовал характеристиками CarbonData и сподвигнул попробовать что-то новое. Если кто-то уже имеет опыт пользования CarbonData - ставьте лайки, буду рад увидеть ваши комментарии. До встречи.