Сценарии применения Greenplum PXF для интеграции с Data Lake, OLTP, Clickhouse
Привет! Меня зовут Артемий Козырь, и я Analytics Engineer в Wheely.
Популярность массивно-параллельных СУБД (MPP) для решения аналитических задач неукоснительно растет. Сегодня хотел бы поговорить о широко распространенной СУБД Greenplum и, в частности, о Platform Extension Framework (PXF) - расширении, с помощью которого открываются почти неограниченные возможности интеграции с множеством внешних систем и форматов данных.
В этой публикации Вас ждет:
Основные возможности PXF, конфигурация, способы оптимизации.
Организация Extract - Load с помощью PXF (Data Lake / OLTP).
Объединение локальных и внешних таблиц в запросах (Federated queries).
Запись данных во внешние системы (Clickhouse).
Базовая идея Greenplum PXF
Заключается в том, чтобы задействовать обработку данных на стороне внешних систем, тем самым обеспечивая пользователю возможность обращения ко всей истории данных вне зависимости от места хранения, но без дополнительных трат ресурсов и времени на полную репликацию.
Демонстрационный сценарий описывает источники, отражающие данные по продажам за период в несколько лет. Операционные данные (например, текущий месяц - hot data) хранятся в OLTP MySQL, в Greenplum хранятся данные для аналитической отчетности (например, последний год-два – warm data), а в AWS S3 архивируются данные за более ранние периоды (cold data).
Таким образом, запрос на вывод суммы продаж с группировкой помесячно может быть распределен на 3 системы и выполнен параллельно. При этом пользователь обращается к одной таблице.
Заявлена поддержка доступа к данным следующих систем:
Hadoop / Hive / HBase
AWS S3 / Google Cloud Storage / Azure Blob Storage / MinIO
Реляционные СУБД (через JDBC)
Network file systems
И следующих форматов хранения:
Text (plain, delimited, embedded line feeds)
JSON
Avro, AvroSequenceFile
SequenceFile, RCFile
ORC / Parquet
В рамках этой публикации я буду использовать управляемый сервис от Я.Облака – Yandex Managed Service for Greenplum®. Конфигурация кластера: 2 x Master + 2 x Segment хоста на базе s2.micro (2 vCPU, 100% vCPU rate, 8 GB RAM).
Managed Service for Greenplum уже включает расширение PXF и его базовую конфигурацию. В случае использования Self-hosted Greenplum, все шаги установки и конфигурации придется проделать самостоятельно:
Соблюдение ряда требований перед установкой.
Загрузка PXF Package.
Установка Package на хостах.
Инициализация и запуск сервиса PXF.
Пошаговые действия описаны в официальной документации, и я не буду останавливаться на этом подробно.
PXF - расширение для работы с внешними данными через EXTERNAL TABLEs
PXF реализует протокол для СУБД Greenplum, который можно использовать при создании внешних таблиц.
Синтаксис комадны CREATE EXTERNAL TABLE с протоколом pxf выглядит следующим образом:
CREATE [WRITABLE] EXTERNAL TABLE <table_name>
( <column_name> <data_type> [, ...] | LIKE <other_table> )
LOCATION('pxf://<path-to-data>?PROFILE=<profile_name>[&SERVER=<server_name>][&<custom-option>=<value>[...]]')
FORMAT '[TEXT|CSV|CUSTOM]' (<formatting-properties>)
;
Краткое пояснение к ключевым параметрам конфигурации:
С помощью PXF можно осуществлять как чтение из внешних источников, так и запись данных во внешние системы (ключевое слово WRITABLE). Также возможно объединение данных из разных источников в одном запросе (т.н. federated queries).
Отдельно хочу заострить внимание на поддержке Filter pushdown и Column Projection.
Filter pushdown позволяет применить ограничение на читаемые строки из выражения WHERE запроса SELECT на стороне источника данных, тем самым значительно снижая нагрузку и объем данных, передаваемых по сети. Например, это может быть использовано в обращениях к внешним СУБД, исключению партиций в Hive (partition pruning), чтению групп строк в колоночных форматах (ORC, Parquet).
Column Projection означает то, что только запрошенные колонки SELECT-запроса буду возвращены из внешних систем. Например, при запросе 5 колонок из 100 возможных в файлах формата Parquet в результате запроса вернутся (будут переданы по сети) только 5, что кратное уменьшает объем данных.
Интеграция с Data Lake (S3 / GCS / HDFS / MinIO)
Создадим EXTERNAL TABLE, указывающую на данные в объектном хранилище:
-- 1. Create EXTERNAL TABLE pointing to S3 (Text file)
DROP EXTERNAL TABLE IF EXISTS src_orders ;
CREATE EXTERNAL TABLE src_orders(
O_ORDERKEY BIGINT,
O_CUSTKEY INT,
O_ORDERSTATUS CHAR(1),
O_TOTALPRICE DECIMAL(15,2),
O_ORDERDATE DATE,
O_ORDERPRIORITY CHAR(15),
O_CLERK CHAR(15),
O_SHIPPRIORITY INTEGER,
O_COMMENT VARCHAR(79)
)
LOCATION ('pxf://otus-dwh/tpch-dbgen/orders.csv?PROFILE=s3:csv&accesskey=<>&secretkey=<>&endpoint=storage.yandexcloud.net'
)
FORMAT 'TEXT'
(DELIMITER '|')
;
Явно подчеркну важность корректной конфигурации для чтения файла, а именно:
Адрес в бакете –
otus-dwh/tpch-dbgen/orders.csv
Профиль для чтения формата файла –
PROFILE=s3:csv
Наличие ключей для доступа к бакету –
accesskey=<>&secretkey=<>
Указание endpoint для S3-подобных хранилищ, например
endpoint=storage.yandexcloud.net
Параметры для чтения конкретных форматов – для текста это
(DELIMITER '|')
В случае некорректной конфигурации, получить данные из внешней таблицы не удастся.
Всего в файле содержится ровно 15М строк.
-- 2. Count rows
SELECT count(1) FROM src_orders ; -- 15000000 ROWS
Теперь пробуем запустить аналитический запрос и посмотреть план его выполнения:
-- 3. Run OLAP query
EXPLAIN ANALYZE
SELECT
DATE_TRUNC('month', O_ORDERDATE) AS order_year
,count(1) AS num_orders
FROM src_orders
WHERE O_ORDERSTATUS = 'P'
GROUP BY 1
ORDER BY 1 ASC
;
На выполнение запроса потребовалось 21.5 секунды, львиная доля времени была затрачена на чтение данных в S3. Даже при наличии фильтра WHERE потребовалось полное чтение файла, так как текстовые файлы не поддерживают predicate pushdown.
Далее с помощью WRITEABLE таблицы запишем эти же данные обратно в S3, но уже в колоночном формате Parquet:
-- 4. Write data back to S3 in columnar format (Parquet)
DROP EXTERNAL TABLE IF EXISTS trg_orders ;
CREATE WRITABLE EXTERNAL TABLE trg_orders(
O_ORDERKEY BIGINT,
O_CUSTKEY INT,
O_ORDERSTATUS CHAR(1),
O_TOTALPRICE DECIMAL(15,2),
O_ORDERDATE DATE,
O_ORDERPRIORITY CHAR(15),
O_CLERK CHAR(15),
O_SHIPPRIORITY INTEGER,
O_COMMENT VARCHAR(79)
)
LOCATION ('pxf://otus-dwh/tpch-dbgen-parquet/orders?PROFILE=s3:parquet&accesskey=<>&secretkey=<>&endpoint=storage.yandexcloud.net'
)
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export');
INSERT INTO trg_orders SELECT * FROM src_orders ORDER BY O_ORDERSTATUS, O_ORDERDATE
;
Запустим тот же самый OLAP-запрос к новой таблице и сравним результаты:
В этот раз запрос выполнился за 4 секунды (в 5 раз быстрее), с учетом predicate pushdown (WHERE O_ORDERSTATUS = 'P'
) и column projection (фактически читали 1 колонку - O_ORDERDATE
), поддерживаемых форматом Parquet.
Интеграция с OLTP СУБД (PostgreSQL)
В этом сценарии перейдем к работе с внешними OLTP-базами, которые динамично наполняются новыми данными и меняют уже имеющиеся. Предположим, что в нашем случае это статистические данные о рекламных кампаниях:
-- 1. Create EXTERNAL TABLE pointing to PostgreSQL table
DROP EXTERNAL TABLE IF EXISTS src_direct_ads_facts ;
CREATE EXTERNAL TABLE src_direct_ads_facts (
id serial ,
account_id int4 ,
dates_id int4 ,
sites_id int4 ,
traffic_id int4 ,
device varchar(16) ,
impressions_context int4 ,
impressions_search int4 ,
impressions int4 ,
clicks_context int4 ,
clicks_search int4 ,
clicks int4 ,
cost_context numeric(18, 2) ,
cost_search numeric(18, 2) ,
"cost" numeric(18, 2) ,
average_position numeric(18, 2) ,
average_position_clicks numeric(18, 2) ,
campaigns_id int4 ,
ads_id int4 ,
adgroups_id int4 ,
bounces int4
)
LOCATION ('pxf://public.direct_ads_facts?PROFILE=JDBC&JDBC_DRIVER=org.postgresql.Driver&DB_URL=jdbc:postgresql://<hostname>:6432/<database>&USER=<username>&PASS=<password>')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import')
;
В случае JDBC видим уже несколько другой набор параметров:
Схема и имя таблицы –
public.direct_ads_facts
Профиль для чтения –
PROFILE=JDBC
Класс JDBC-драйвера –
JDBC_DRIVER=org.postgresql.Driver
DB URI –
DB_URL=jdbc:postgresql://<hostname>:6432/<database>&USER=<username>&PASS=<password>'
Будем считать, что необходимо регулярно получать данные из системы-источника в Greenplum. Сделать это можно двумя способами:
Копирование всей таблицы при каждом запросе.
Организация инкрементальной загрузки.
Первый способ обладает очевидными недостатками – чрезмерная нагрузка на источник и копирование одних и тех же данных каждый раз. Инкрементальный способ загрузки представляется гораздо более эффективным, но для него понадобятся способы выделения дельты (изменений), например, монотонно возрастающий ключ:
-- 1. Initialize
CREATE TABLE direct_ads_facts AS SELECT * FROM src_direct_ads_facts ;
-- 2. Incremental load
INSERT INTO direct_ads_facts SELECT * FROM src_direct_ads_facts
WHERE id > (SELECT MAX(id) FROM direct_ads_facts) ;
Использование федеративных запросов (Federated Queries)
Теперь представьте ситуацию, когда огромную таблицу фактов нужно склеить с маленьким, но часто меняющимся справочником. Реальный пример - это справочник названий рекламных кампаний, которые всегда имеют одинаковый идентификатор, но периодически меняют свои названия (label).
Для этого идеально подойдет PXF с возможностью обращения к источнику src_direct_campaigns
для получения самых актуальных значений из справочника:
-- 2.7. Create Data Mart
CREATE TABLE direct_search_facts AS
SELECT
cf.account_id AS account_id
, MD5(CONCAT(CAST(cf.id AS varchar), 'yandex.search')) AS id
, ga.caption AS caption
, gd.simple_date AS dt
, cf.device AS device
, 'yandex.search' AS SOURCE
, gt.medium AS medium
, CAST(cp.campaign_id AS Int) AS campaign_id
, cp.name AS campaign -- Always correct and up-to-date
, gt.campaign AS traffic_campaign
, gt.content AS CONTENT
, gt.keyword AS keyword
, gs.domain AS DOMAIN
, gt.landing_page AS landing_page
, cf.impressions_search AS impressions
, cf.clicks_search AS clicks
, cf.cost_search AS COST
FROM direct_ads_facts AS cf
LEFT JOIN general_accounts AS ga
ON ga.account_id = cf.account_id
LEFT JOIN general_dates AS gd
ON gd.id = cf.dates_id
LEFT JOIN src_direct_campaigns AS cp -- ! From PostgreSQL directly
ON cp.id = cf.campaigns_id
LEFT JOIN general_sites AS gs
ON gs.id = cf.sites_id
LEFT JOIN general_traffic AS gt
ON gt.id = cf.traffic_id
;
Запись данных в Clickhouse для сверхбыстрого чтения
Полученная широкая таблица-витрина содержит все данные, чтобы задавать многочисленные аналитические вопросы. Однако, порой даже Greenplum может не справляться с множеством сложных аналитических запросов, существенно теряя в скорости и интерактивности. На помощь приходит сверхбыстрый Clickhouse! Наша задача - переместить готовую витрину данных туда, где с ней можно будет работать в режиме пинг-понг, вне зависимости от объема данных.
Для начала на стороне приемника необходимо создать пустую таблицу (схему):
CREATE TABLE direct_search(
account_id Int8
, id String
, caption String
, dt String
, device String
, "source" String
, medium String
, campaign_id Int8
, campaign String
, traffic_campaign String
, "content" String
, keyword String
, "domain" String
, landing_page String
, impressions Int8
, clicks Int8
, "cost" Float32
)
ENGINE = MergeTree()
ORDER BY (dt)
;
После этого мы готовы к записи данных в Clickhouse с помощью PXF:
DROP EXTERNAL TABLE IF EXISTS trg_ch_direct_search ;
CREATE WRITABLE EXTERNAL TABLE trg_ch_direct_search(
account_id int4 ,
id varchar(128) ,
caption varchar(128) ,
dt varchar(128) ,
device varchar(16) ,
"source" varchar(1024) ,
medium varchar(1024) ,
campaign_id int4 ,
campaign varchar(1024) ,
traffic_campaign varchar(1024) ,
"content" varchar(1024) ,
keyword varchar(1024) ,
"domain" varchar(1024) ,
landing_page varchar(1024) ,
impressions int4 ,
clicks int4 ,
"cost" numeric(18, 2)
)
LOCATION ('pxf://direct_search?PROFILE=JDBC&JDBC_DRIVER=ru.yandex.clickhouse.ClickHouseDriver&DB_URL=jdbc:clickhouse://<hostname>:8123/default')
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_export')
;
INSERT INTO trg_ch_direct_search
SELECT
account_id
,id
,caption
,dt
,device
,"source"
,medium
,campaign_id
,campaign
,traffic_campaign
,"content"
,keyword
,"domain"
,landing_page
,impressions
,clicks
,"cost"
FROM direct_search
;
Ничего сложного, несколько простых действий, немного терпения и готово. Теперь с этой витриной можно работать в Clickhouse, который, как известно, не тормозит!
Проблемы и трудности
В рамках исследования и подготовки материала я столкнулся с несколькими трудностями, на которые хотел бы обратить ваше внимание:
В случае работы с S3 необходимо указывать ключи доступа
accesskey=<>&secretkey=<>
даже для публично доступных buckets.Для работы PXF в Yandex Managed Service for Greenplum необходимо включить Egress NAT для подсети хостов.
С недавнего времени на сайте с документацией появилось релевантное сообщение:
Для подключения к внешним источникам необходимо включить NAT в интернет для подсети, в которой расположен кластер Managed Service for Greenplum®.
В противном случае вы будете получать ошибку с таймаутом SQL Error [08000]: ERROR: PXF server error : Could not obtain datasource for server default and PoolDescriptor
Отсутствие SSL-соединения для JDBC.
«PXF» не поддерживает «SSL-соединение» на уровне параметров драйвера ClickHouse-JDBC.
Workaround: Вы можете оставить в кластере ClickHouse один хост без публичного доступа и к нему обращаться из Greenplum.
Умение строить комплексные решения, отвечающие на запросы бизнеса
Это то, что хотят видеть нанимающие менеджеры. Специалисты широкого профиля, мультиинструменталисты, обладающие автономностью и способные самостоятельно решать задачи и создавать ценность для бизнеса нужны на рынке как никогда.
Именно эти аспекты я держал в уме, когда работал над программами курсов Analytics Engineer, Data Engineer, DataOps Engineer в OTUS.
Это не просто набор занятий по темам, а единая, связная история, в которой акцент делается на понимание потребностей заказчиков. На live-сессиях я и мои коллеги делимся своим опытом и реальными кейсами.
В комментарии поделитесь, с каким кейсом использования PXF удалось поработать, либо как планируете применять?