Привет! Меня зовут Артемий Козырь, и я Analytics Engineer в Wheely.

Популярность массивно-параллельных СУБД (MPP) для решения аналитических задач неукоснительно растет. Сегодня хотел бы поговорить о широко распространенной СУБД Greenplum и, в частности, о Platform Extension Framework (PXF) - расширении, с помощью которого открываются почти неограниченные возможности интеграции с множеством внешних систем и форматов данных.

В этой публикации Вас ждет:

  • Основные возможности PXF, конфигурация, способы оптимизации.

  • Организация Extract - Load с помощью PXF (Data Lake / OLTP).

  • Объединение локальных и внешних таблиц в запросах (Federated queries).

  • Запись данных во внешние системы (Clickhouse).

Базовая идея Greenplum PXF

Заключается в том, чтобы задействовать обработку данных на стороне внешних систем, тем самым обеспечивая пользователю возможность обращения ко всей истории данных вне зависимости от места хранения, но без дополнительных трат ресурсов и времени на полную репликацию.

Демонстрационный сценарий описывает источники, отражающие данные по продажам за период в несколько лет. Операционные данные (например, текущий месяц - hot data) хранятся в OLTP MySQL, в Greenplum хранятся данные для аналитической отчетности (например, последний год-два – warm data), а в AWS S3 архивируются данные за более ранние периоды (cold data).

Таким образом, запрос на вывод суммы продаж с группировкой помесячно может быть распределен на 3 системы и выполнен параллельно. При этом пользователь обращается к одной таблице.

Заявлена поддержка доступа к данным следующих систем:

  • Hadoop / Hive / HBase

  • AWS S3 / Google Cloud Storage / Azure Blob Storage / MinIO

  • Реляционные СУБД (через JDBC)

  • Network file systems

И следующих форматов хранения:

  • Text (plain, delimited, embedded line feeds)

  • JSON

  • Avro, AvroSequenceFile

  • SequenceFile, RCFile

  • ORC / Parquet

В рамках этой публикации я буду использовать управляемый сервис от Я.Облака – Yandex Managed Service for Greenplum®. Конфигурация кластера: 2 x Master + 2 x Segment хоста на базе s2.micro (2 vCPU, 100% vCPU rate, 8 GB RAM).

Managed Service for Greenplum уже включает расширение PXF и его базовую конфигурацию. В случае использования Self-hosted Greenplum, все шаги установки и конфигурации придется проделать самостоятельно:

  • Соблюдение ряда требований перед установкой.

  • Загрузка PXF Package.

  • Установка Package на хостах.

  • Инициализация и запуск сервиса PXF.

Пошаговые действия описаны в официальной документации, и я не буду останавливаться на этом подробно.

PXF - расширение для работы с внешними данными через EXTERNAL TABLEs

PXF реализует протокол для СУБД Greenplum, который можно использовать при создании внешних таблиц.

Синтаксис комадны CREATE EXTERNAL TABLE с протоколом pxf выглядит следующим образом:

CREATE [WRITABLE] EXTERNAL TABLE <table_name>
( <column_name> <data_type> [, ...] | LIKE <other_table> )
LOCATION('pxf://<path-to-data>?PROFILE=<profile_name>[&SERVER=<server_name>][&<custom-option>=<value>[...]]')
FORMAT '[TEXT|CSV|CUSTOM]' (<formatting-properties>)
;

Краткое пояснение к ключевым параметрам конфигурации:

С помощью PXF можно осуществлять как чтение из внешних источников, так и запись данных во внешние системы (ключевое слово WRITABLE). Также возможно объединение данных из разных источников в одном запросе (т.н. federated queries).

Отдельно хочу заострить внимание на поддержке Filter pushdown и Column Projection.

Filter pushdown позволяет применить ограничение на читаемые строки из выражения WHERE запроса SELECT на стороне источника данных, тем самым значительно снижая нагрузку и объем данных, передаваемых по сети. Например, это может быть использовано в обращениях к внешним СУБД, исключению партиций в Hive (partition pruning), чтению групп строк в колоночных форматах (ORC, Parquet).

Column Projection означает то, что только запрошенные колонки SELECT-запроса буду возвращены из внешних систем. Например, при запросе 5 колонок из 100 возможных в файлах формата Parquet в результате запроса вернутся (будут переданы по сети) только 5, что кратное уменьшает объем данных.

Интеграция с Data Lake (S3 / GCS / HDFS / MinIO)

Создадим EXTERNAL TABLE, указывающую на данные в объектном хранилище:

-- 1. Create EXTERNAL TABLE pointing to S3 (Text file)
DROP EXTERNAL TABLE IF EXISTS src_orders ;
CREATE EXTERNAL TABLE src_orders(
O_ORDERKEY BIGINT,
O_CUSTKEY INT,
O_ORDERSTATUS CHAR(1),
O_TOTALPRICE DECIMAL(15,2),
O_ORDERDATE DATE,
O_ORDERPRIORITY CHAR(15),
O_CLERK  CHAR(15),
O_SHIPPRIORITY INTEGER,
O_COMMENT VARCHAR(79)
)
LOCATION ('pxf://otus-dwh/tpch-dbgen/orders.csv?PROFILE=s3:csv&accesskey=<>&secretkey=<>&endpoint=storage.yandexcloud.net'
)
FORMAT 'TEXT'
(DELIMITER '|')
;

Явно подчеркну важность корректной конфигурации для чтения файла, а именно:

  • Адрес в бакете – otus-dwh/tpch-dbgen/orders.csv

  • Профиль для чтения формата файла – PROFILE=s3:csv

  • Наличие ключей для доступа к бакету – accesskey=<>&secretkey=<>

  • Указание endpoint для S3-подобных хранилищ, например endpoint=storage.yandexcloud.net

  • Параметры для чтения конкретных форматов – для текста это (DELIMITER '|')

В случае некорректной конфигурации, получить данные из внешней таблицы не удастся.

Всего в файле содержится ровно 15М строк.

-- 2. Count rows
SELECT count(1) FROM src_orders ; -- 15000000 ROWS

Теперь пробуем запустить аналитический запрос и посмотреть план его выполнения:

-- 3. Run OLAP query
EXPLAIN ANALYZE
SELECT
    DATE_TRUNC('month', O_ORDERDATE) AS order_year
    ,count(1) AS num_orders
FROM src_orders
WHERE O_ORDERSTATUS = 'P'
GROUP BY 1
ORDER BY 1 ASC 
;

На выполнение запроса потребовалось 21.5 секунды, львиная доля времени была затрачена на чтение данных в S3. Даже при наличии фильтра WHERE потребовалось полное чтение файла, так как текстовые файлы не поддерживают predicate pushdown.

Далее с помощью WRITEABLE таблицы запишем эти же данные обратно в S3, но уже в колоночном формате Parquet:

-- 4. Write data back to S3 in columnar format (Parquet)
DROP EXTERNAL TABLE IF EXISTS trg_orders ;
CREATE WRITABLE EXTERNAL TABLE trg_orders(
    O_ORDERKEY BIGINT,
    O_CUSTKEY INT,
    O_ORDERSTATUS CHAR(1),
    O_TOTALPRICE DECIMAL(15,2),
    O_ORDERDATE DATE,
    O_ORDERPRIORITY CHAR(15), 
    O_CLERK  CHAR(15), 
    O_SHIPPRIORITY INTEGER,
    O_COMMENT VARCHAR(79)
)
LOCATION ('pxf://otus-dwh/tpch-dbgen-parquet/orders?PROFILE=s3:parquet&accesskey=<>&secretkey=<>&endpoint=storage.yandexcloud.net'    
)
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
INSERT INTO trg_orders SELECT * FROM src_orders ORDER BY O_ORDERSTATUS, O_ORDERDATE
;

Запустим тот же самый OLAP-запрос к новой таблице и сравним результаты:

В этот раз запрос выполнился за 4 секунды (в 5 раз быстрее), с учетом predicate pushdown (WHERE O_ORDERSTATUS = 'P') и column projection (фактически читали 1 колонку - O_ORDERDATE), поддерживаемых форматом Parquet.

Интеграция с OLTP СУБД (PostgreSQL)

В этом сценарии перейдем к работе с внешними OLTP-базами, которые динамично наполняются новыми данными и меняют уже имеющиеся. Предположим, что в нашем случае это статистические данные о рекламных кампаниях:

-- 1. Create EXTERNAL TABLE pointing to PostgreSQL table
DROP EXTERNAL TABLE IF EXISTS src_direct_ads_facts ;
CREATE EXTERNAL TABLE src_direct_ads_facts (
    id serial ,
    account_id int4 ,
    dates_id int4 ,
    sites_id int4 ,
    traffic_id int4 ,
    device varchar(16) ,
    impressions_context int4 ,
    impressions_search int4 ,
    impressions int4 ,
    clicks_context int4 ,
    clicks_search int4 ,
    clicks int4 ,
    cost_context numeric(18, 2) ,
    cost_search numeric(18, 2) ,
    "cost" numeric(18, 2) ,
    average_position numeric(18, 2) ,
    average_position_clicks numeric(18, 2) ,
    campaigns_id int4 ,
    ads_id int4 ,
    adgroups_id int4 ,
    bounces int4
)
LOCATION ('pxf://public.direct_ads_facts?PROFILE=JDBC&JDBC_DRIVER=org.postgresql.Driver&DB_URL=jdbc:postgresql://<hostname>:6432/<database>&USER=<username>&PASS=<password>')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import')
;

В случае JDBC видим уже несколько другой набор параметров:

  • Схема и имя таблицы – public.direct_ads_facts

  • Профиль для чтения – PROFILE=JDBC

  • Класс JDBC-драйвера – JDBC_DRIVER=org.postgresql.Driver

  • DB URI – DB_URL=jdbc:postgresql://<hostname>:6432/<database>&USER=<username>&PASS=<password>'

Будем считать, что необходимо регулярно получать данные из системы-источника в Greenplum. Сделать это можно двумя способами:

  • Копирование всей таблицы при каждом запросе.

  • Организация инкрементальной загрузки.

Первый способ обладает очевидными недостатками – чрезмерная нагрузка на источник и копирование одних и тех же данных каждый раз. Инкрементальный способ загрузки представляется гораздо более эффективным, но для него понадобятся способы выделения дельты (изменений), например, монотонно возрастающий ключ:

-- 1. Initialize
CREATE TABLE direct_ads_facts AS SELECT * FROM src_direct_ads_facts ;
-- 2. Incremental load
INSERT INTO direct_ads_facts SELECT * FROM src_direct_ads_facts
WHERE id > (SELECT MAX(id) FROM direct_ads_facts) ;

Использование федеративных запросов (Federated Queries)

Теперь представьте ситуацию, когда огромную таблицу фактов нужно склеить с маленьким, но часто меняющимся справочником. Реальный пример - это справочник названий рекламных кампаний, которые всегда имеют одинаковый идентификатор, но периодически меняют свои названия (label).

Для этого идеально подойдет PXF с возможностью обращения к источнику src_direct_campaigns для получения самых актуальных значений из справочника:

-- 2.7. Create Data Mart
CREATE TABLE direct_search_facts AS
SELECT    
      cf.account_id AS account_id
    , MD5(CONCAT(CAST(cf.id AS varchar), 'yandex.search')) AS id
    , ga.caption AS caption
    , gd.simple_date AS dt
    , cf.device AS device
    , 'yandex.search' AS SOURCE
    , gt.medium AS medium
    , CAST(cp.campaign_id AS Int) AS campaign_id
    , cp.name AS campaign -- Always correct and up-to-date
    , gt.campaign AS traffic_campaign
    , gt.content AS CONTENT
    , gt.keyword AS keyword
    , gs.domain AS DOMAIN
    , gt.landing_page AS landing_page
    , cf.impressions_search AS impressions
    , cf.clicks_search AS clicks
    , cf.cost_search AS COST
FROM direct_ads_facts AS cf
    LEFT JOIN general_accounts AS ga
            ON ga.account_id = cf.account_id
    LEFT JOIN general_dates AS gd
            ON gd.id = cf.dates_id
    LEFT JOIN src_direct_campaigns AS cp -- ! From PostgreSQL directly
            ON cp.id = cf.campaigns_id
    LEFT JOIN general_sites AS gs
            ON gs.id = cf.sites_id
    LEFT JOIN general_traffic AS gt
            ON gt.id = cf.traffic_id
;

Запись данных в Clickhouse для сверхбыстрого чтения

Полученная широкая таблица-витрина содержит все данные, чтобы задавать многочисленные аналитические вопросы. Однако, порой даже Greenplum может не справляться с множеством сложных аналитических запросов, существенно теряя в скорости и интерактивности. На помощь приходит сверхбыстрый Clickhouse! Наша задача - переместить готовую витрину данных туда, где с ней можно будет работать в режиме пинг-понг, вне зависимости от объема данных.

Для начала на стороне приемника необходимо создать пустую таблицу (схему):

CREATE TABLE direct_search(
	  account_id Int8
    , id String
	, caption String
	, dt String
	, device String
	, "source" String
	, medium String
	, campaign_id Int8
	, campaign String
	, traffic_campaign String
	, "content" String
	, keyword String
	, "domain" String
	, landing_page String
	, impressions Int8
	, clicks Int8
	, "cost" Float32
)
ENGINE = MergeTree()  
ORDER BY (dt)
;

После этого мы готовы к записи данных в Clickhouse с помощью PXF:

DROP EXTERNAL TABLE IF EXISTS trg_ch_direct_search ;

CREATE WRITABLE EXTERNAL TABLE trg_ch_direct_search(
account_id int4 ,
id varchar(128) ,
caption varchar(128) ,
dt varchar(128) ,
device varchar(16) ,
"source" varchar(1024) ,
medium varchar(1024) ,
campaign_id int4 ,
campaign varchar(1024) ,
traffic_campaign varchar(1024) ,
"content" varchar(1024) ,
keyword varchar(1024) ,
"domain" varchar(1024) ,
landing_page varchar(1024) ,
impressions int4 ,
clicks int4 ,
"cost" numeric(18, 2)
)
LOCATION ('pxf://direct_search?PROFILE=JDBC&JDBC_DRIVER=ru.yandex.clickhouse.ClickHouseDriver&DB_URL=jdbc:clickhouse://<hostname>:8123/default')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export')
;

INSERT INTO trg_ch_direct_search
SELECT
account_id
,id
,caption
,dt
,device
,"source"
,medium
,campaign_id
,campaign
,traffic_campaign
,"content"
,keyword
,"domain"
,landing_page
,impressions
,clicks
,"cost"
FROM direct_search
;

Ничего сложного, несколько простых действий, немного терпения и готово. Теперь с этой витриной можно работать в Clickhouse, который, как известно, не тормозит!

Проблемы и трудности

В рамках исследования и подготовки материала я столкнулся с несколькими трудностями, на которые хотел бы обратить ваше внимание:

  1. В случае работы с S3 необходимо указывать ключи доступа accesskey=<>&secretkey=<> даже для публично доступных buckets.

  2. Для работы PXF в Yandex Managed Service for Greenplum необходимо включить Egress NAT для подсети хостов.

С недавнего времени на сайте с документацией появилось релевантное сообщение:

Для подключения к внешним источникам необходимо включить NAT в интернет для подсети, в которой расположен кластер Managed Service for Greenplum®.

В противном случае вы будете получать ошибку с таймаутом SQL Error [08000]: ERROR: PXF server error : Could not obtain datasource for server default and PoolDescriptor

  1. Отсутствие SSL-соединения для JDBC.

«PXF» не поддерживает «SSL-соединение» на уровне параметров драйвера ClickHouse-JDBC.

Workaround: Вы можете оставить в кластере ClickHouse один хост без публичного доступа и к нему обращаться из Greenplum.

Умение строить комплексные решения, отвечающие на запросы бизнеса

Это то, что хотят видеть нанимающие менеджеры. Специалисты широкого профиля, мультиинструменталисты, обладающие автономностью и способные самостоятельно решать задачи и создавать ценность для бизнеса нужны на рынке как никогда.

Именно эти аспекты я держал в уме, когда работал над программами курсов Analytics EngineerData EngineerDataOps Engineer в OTUS.

Это не просто набор занятий по темам, а единая, связная история, в которой акцент делается на понимание потребностей заказчиков. На live-сессиях я и мои коллеги делимся своим опытом и реальными кейсами.

В комментарии поделитесь, с каким кейсом использования PXF удалось поработать, либо как планируете применять?