Как стать автором
Обновить
0
Arenadata
Разработчик платформы данных на базе Open Source

Как и зачем мы сделали Spark-коннектор к Greenplum

Время на прочтение5 мин
Количество просмотров6K

Всем привет! Меня зовут Андрей, я работаю системным архитектором в Arenadata. В этой статье расскажу, как и зачем мы сделали свой инструмент для обмена данными между Arenadata DB (аналитическая MPP-СУБД на базе Greenplum) и фреймворком для распределенной обработки данных Apache Spark (входит в экосистему Arenadata Hadoop).

Поскольку Greenplum и Hadoop — это базовые элементы нашего стека, нам часто приходится сталкиваться с задачами обработки и передачи информации между ними. Ранее мы решали эту задачу частично с помощью Platform Extension Framework (PXF), но это лишает нас всех преимуществ, которые предлагает Spark, а они довольно существенны.

Мы решили написать ADB-Spark Connector на базе HTTP-сервера, реализующего протокол gpfdist. Почему выбрали именно такой путь и на какой архитектуре в итоге остановились, расскажу ниже в статье.

Немного теории

Arenadata DB — это распределенная СУБД, построенная на базе Greenplum, которая относится к классу MPP-систем. MPP (massively parallel processing, массово-параллельные вычисления) — класс СУБД, распределяющих хранимые данные на множестве вычислительных узлов для параллельной обработки больших объемов данных. Каждый узел имеет собственное хранилище и вычислительные ресурсы, позволяющие выполнять часть общего запроса к базе. Логика разбиения таблицы на сегменты задается ключом (полем) дистрибуции. Для каждой отдельной колонки в таблице можно задать свой тип и уровень сжатия.

MPP DB хорошо подходят в качестве аналитического хранилища данных, потому что:

  • масштабируются горизонтально;

  • хорошо держат OLAP нагрузку;

  • позволяют загружать данные с большой скоростью.

Рассмотрим способы, с помощью которых можно обмениваться данными с ADB:

1. C помощью JDBC-драйвера через мастер ADB. Этот способ доступен «из коробки» с помощью стандартного Spark JDBC-коннектора:

2. C помощью COPY на мастере:

3. C помощью распределенного COPY:

4. С помощью gpfdist — HTTP-сервера для параллельного обмена данными с сегментами GP:

5. С помощью HTTP-сервера, реализующего протокол gpfdist (1,2):

Выбор основы для коннектора

Мы решили написать коннектор на основе HTTP-сервера, реализующего протокол gpfdist-a. Разработали две реализации протокола на базе:

  1. Akka-HTTP;

  2. Finagle.

Версия с Finagle показала себя лучше при наличии множества одновременных сессий от сегментов ADB, поэтому остановились на ней.

Теперь взглянем на эту задачу со стороны Apache Spark. Высокоуровневая архитектура процесса компиляции запросов Spark соответствует традиционному подходу СУБД, который заключается в преобразовании запроса в логический план, оптимизации логического плана в физический и его выполнении на рабочих узлах.

Рассмотрим средства, предоставляемые Spark-ом для написания коннекторов к различным источникам данных.

Data Source API был представлен в Spark 1.3 вместе с абстракцией Dataframe. Со времени этого релиза Spark претерпел большие изменения. С версией 2.0 пришла новая абстракция DataSet, и в связи с этим API был переосмыслен. Ограничения API V1:

  • смешанное из API низкого и высокого уровня;

  • cложно делать Push down операторов;

  • сложно передавать информацию о партициях;

  • нет поддержки транзакционной записи;

  • нет поддержки колоночного режима;

  • нет поддержки стриминга.

С версией 2.3 выпустили и новый API, известный как Data Source V2. В ней нет вышеперечисленных ограничений, и её характерной особенностью является переход Scala-трейтов к Java-интерфейсам для лучшей совместимости.

Рассмотрим подробней, что происходит с точки зрения Data Source API v2 на каждом из представленных этапов.

Источник данных представлен в логическом плане экземпляром DataSourceV2Relation. Во время планирования DataSourceV2Strategy преобразует отношение в экземпляр DataSourceV2ScanExec. Последний создает экземпляр DataSourceRDD, который используется для фактического параллельного вычисления результатов из нескольких разделов.

Логический план — на этом этапе цель состоит в том, чтобы создать DataSourceV2Relation из опций, предоставленных клиентом. Процесс инициируется загрузкой Dataset-а.

Физический план — на этом этапе цель состоит в том, чтобы преобразовать DataSourceV2Relation в DataSourceV2ScanExec. Процесс инициируется выполнением действия с Dataset-ом, созданным на предыдущем этапе.

Выполнение запроса — на этом этапе цель состоит в том, чтобы получить данные из партиций RDD, созданного DataSourceV2ScanExec, и собрать строки из всех разделов.

На основании вышеизложенного, было принято решение реализовать коннектор с помощью Datasource API v2.

Архитектура

Каждое Spark-приложение состоит из управляющего процесса-драйвера и набора распределённых рабочих процессов-исполнителей. Общая компонентная диаграмма взаимодействия Spark-приложения и Greenplum кластера:

Чтение данных

Загрузка данных в Spark из Greenplum состоит из нескольких этапов:

  1. Инициализация Spark-драйвера.

  2. Spark-драйвер при помощи JDBC устанавливает соединение с мастером Greenplum для получения необходимых метаданных о кластере и таблице:

  • количество активных сегментов;

  • схема и тип таблицы;

  • ключ распределения таблицы;

  • план запроса.

  1. В зависимости от выбранной стратегии партиционирования (речь о которых пойдет далее) происходит вычисление количества партиций, а также генерация условий, используемых при загрузке данных с сегментов в партиции.

  2. На каждый Spark-исполнитель назначается задача по обработке данных  и соответствующая ей партиция.

Обмен данными происходит с помощью механизма writable-внешних таблиц, одновременно для каждого сегмента. На текущий момент реализованы следующие стратегии партиционирования:

  • по gp_segment_id: данные считываются и распределяются по партициям Spark в соответствии с их распределением по сегментам в Greenplum; 

  • по указанной колонке и указанному количеству партиций: для указанной колонки запрашиваются минимальное и максимальное значения и генерируются условия считывания данных из сегментов. Таким образом в Spark-партиции загружаются данные соответствующего диапазона;

  • только по указанной колонке: данные разбиваются в соответствии с уникальными значениями указанной колонки. В таком случае количество Spark-партиций соответствует количеству уникальных значений указанной колонки;

  • только по указанному количеству партиций: данные разбиваются согласно некоторой хеш-функции (по умолчанию) на указанное количество партиций;

  • по указанной хеш-функции и указанному количеству партиций: аналогично предыдущему пункту, с указанием желаемой хеш-функции.

Запись данных

Выгрузка данных из Spark в Greenplum происходит в несколько этапов:

  1. Инициализация Spark-драйвера.

  2. Spark-драйвер при помощи JDBC устанавливает соединение с мастером Greenplum.

  3. В зависимости от режима записи Spark-драйвер производит инициализирующие действия (создание или очистка таблиц) в Greenplum-е для загрузки.

  4. Каждый из Spark-исполнителей выгружает данные из назначенной ему партиции в Greenplum.

Обмен данными происходит с помощью механизма readable-внешних таблиц, одновременно для каждого сегмента.

На текущий момент поддерживаются следующие режимы записи:

  • перезапись (overwrite): либо целевая таблица удаляется полностью и пересоздается, либо происходит truncate;

  • добавление (append): данные добавляются в целевую таблицу;

  • запись в новую таблицу: завершение с ошибкой, если целевая таблица уже существует (errorIfExists).

Резюмируем функциональные возможности:

  • чтение данных из Greenplum;

  • запись данных в Greenplum с помощью различных режимов записи:

    • overwrite;

    • append;

    • errorIfExists;

  • автоматическое формирование схемы данных;

  • гибкое партиционирование;

  • дополнительные опции при создании целевой таблицы в Greenplum;

  • поддержка push-down операторов:

    • отсекание колонок;

    • push-down фильтров.

  • извлечение дополнительных метаданных из Greenplum:

    • схема распределения данных;

    • статистика.

  • оптимизация выполнения count-выражений в запросе;

  • выполнение произвольного SQL через мастер Greenplum;

  • поддержка пакетного (batch) режима при загрузке в Spark.

Аналоги

На текущий момент существует Spark Greenplum-коннектор от Pivotal

Мы провели сравнительный анализ функциональных возможностей и производительности:

Таким образом, нам удалось создать высокопроизводительный двунаправленный Spark-ADB коннектор, который, в свою очередь, обладает рядом функциональных преимуществ относительно аналога от Pivotal. 

Подробное описание коннектора с примерами использования можно посмотреть в официальной документации.

Теги:
Хабы:
Всего голосов 3: ↑3 и ↓0+3
Комментарии4

Публикации

Информация

Сайт
arenadata.tech
Дата регистрации
Дата основания
2016
Численность
201–500 человек
Местоположение
Россия
Представитель
Arenadata

Истории