Pull to refresh

Как я реплицировал данные с помощью postgres_fdw и dbt

Level of difficultyMedium
Reading time5 min
Views3.8K

Привет, Хабр!

Так вышло, что по работе я столкнулся с необходимостью репликации некоторого (достаточно большого) количества таблиц из различных баз postgres в одну базу postgres для нужд Data Science. Казалось бы, что может быть проще, ведь в postgres есть механизм логической репликации. А логическая репликация между двумя постгресами вообще не требует никаких дополнительных инструментов.

К сожалению, от этого варианта пришлось отказаться — большое количество таблиц и операций вставки/обновления/удаления строк в каждой из баз данных недопустимо увеличивало бы размер WAL-журналов.

Некоторые инструменты репликации, как, например, meltano, поддерживают, помимо log-based репликации, инкрементную репликацию на основе ключей и полную репликацию. 

Инкрементная репликация обычно использует значение определенного столбца, например, метку времени или увеличивающееся целое число. Да и в принципе наличие поля lastmodified позволяет “в лоб” реплицировать данные за определенный промежуток времени. 

Однако, я столкнулся с тем, что не во всех нужных мне таблицах было такое поле, а в некоторых данные могли обновляться “задним” числом. 

Так я пришел к мысли использовать postgres_fdw для связи с данными на других серверах postgres и dbt для переноса данных в локальные таблицы. 

Подготовим данные

Предлагаю воспользоваться этим репозиторием. Он содержит csv-файл с небольшим набором данных транзакций. В принципе, для демонстрации этого достаточно. 

Отредактируем файл docker-compose.yaml и добавим туда еще один контейнер postgres, куда мы будем реплицировать наши данные:

docker-compose.yaml
version: '3'

services:
  postgresdb:
    container_name: postgresdb
    image: postgres:14.5
    restart: always
    environment:
      POSTGRES_USER: "postgres"
      POSTGRES_PASSWORD: "postgres"
    volumes:
      - ./data/init.sql:/docker-entrypoint-initdb.d/init.sql
      - ./data/transaction_data.csv:/var/lib/postgresql/files/transaction_data.csv
    ports:
      - "5432:5432"
    networks:
      - postgres

  postgresdb_replica:
    container_name: postgresdb_replica
    image: postgres:14.5
    restart: always
    environment:
      POSTGRES_USER: "postgres"
      POSTGRES_PASSWORD: "postgres"
    ports:
      - "5430:5432"
    networks:
      - postgres

networks:
  postgres:

Запускаем наши контейнеры:

docker compose -f “docker-compose.yaml” up -d –build

Теперь у нас есть два работающих экземпляра postgres.

Далее нам необходимо подключиться к базе данных контейнера postgresdb_replica и подготовиться к обращению к удаленным данным через postgres_fdw. Для начала узнаем IPAddress контейнера postgresdb:

docker inspect \
  -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' postgresdb

Мой хост — 172.25.0.3.

Настройка postgres_fdw

Далее подключаемся к postgresdb_replica и выполняем команды:

-- создаем схему, куда будем складывать сторонние таблички
CREATE schema IF NOT EXISTS foreign_tables;

-- устанавливаем расширение postgres_fdw
CREATE EXTENSION postgres_fdw;

-- создаем сервер для подключения к сторонней базе данных
CREATE SERVER IF NOT EXISTS foreign_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host ‘172.25.0.3’, port '5432', dbname 'postgres');

-- создаем сопоставление пользователей 
-- (рекомендуется заводить отдельные учетные записи для postgres_fdw с ролью, 
-- которая будет зависеть от того, хотите ли вы изменять исходные таблицы)
CREATE USER MAPPING IF NOT EXISTS FOR postgres
SERVER foreign_server
OPTIONS (user ‘postgres’, password ‘postgres’);

-- импортируем нашу табличку:
IMPORT FOREIGN SCHEMA public
LIMIT TO (transactions)
FROM SERVER foreign_server
INTO foreign_tables

Теперь для обращения к удаленным данным нам достаточно выполнить операцию:

SELECT * FROM foreign_tables.transactions

Настройка dbt

Для начала установим необходимые зависимости:

pip install dbt

DBT — это мощный инструмент для выполнения ELT процессов. Он позволяет разрабатывать, тестировать и запускать SQL скрипты, а также создавать модули и переиспользовать код, что помогает сократить время разработки и поддержки процессов ELT.

Детальное описание dbt выходит за рамки этой статьи, но я бы рекомендовал почитать эту статью, ну и, разумеется, документацию)

Мы можем инициализировать проект dbt, выполнив:

dbt init

Это создаст необходимую структуру папок, но для нашего игрушечного примера она немного избыточна, поэтому создадим директорию c такой структурой:

tutorial/
├── models
│   └── transactions.sql
├── dbt_project.yml
└── profiles.yml

В директории models/ мы разместим нашу sql модель. Модель в dbt — это набор SQL запросов, которые описывают, как преобразовать или агрегировать входные данные, чтобы создать структурированные таблицы для представления данных в нашей базе данных. Модели являются основным компонентом dbt проекта.

transactions.sql
{{
   config(
       schema='row',
       materialized='table'
   )
}}


select * from foreign_tables.transactions

Здесь мы сообщаем dbt, что хотим создать таблицу transactions (таблицы или представления создаются по имени файла) и материализовать ее именно, как таблицу. Про материализации можно прочитать здесь.

И добавляем простой запрос SELECT для репликации наших исходных данных.

Каждый проект dbt должен содержать файл dbt_project.yml — так dbt узнает, что каталог является проектом dbt. Список доступных конфигураций можно посмотреть здесь.

dbt_project.yml
name: 'tutorial'
version: '2.0.0'
config-version: 2

profile: 'tutorial'

model-paths: 
  - models


target-path: "target"
clean-targets:
  - "target"
  - "dbt_modules"

Также нам потребуется файл profiles.yml, который будет содержать сведения о подключении к нашей платформе данных. 

profiles.yml
config:
 send_anonymous_usage_stats: False
 use_colors: True
 partial_parse: True


tutorial:
 target: prod
 outputs:
   prod:
     type: postgres
     thread: 2
     host: localhost
     port: 5430
     user: postgres
     pass: postgres
     dbname: postgres
     schema: dbt

Мы готовы запустить наш маленький dbt проект:

dbt run –project-dir . – profiles-dir . -m transactions

Мы реплицировали таблицу из сторонней базы данных Postgres. Основная причина использовать dbt для репликации внешних таблиц в локальные — сложные запросы к стороннему серверу могут выполняться бесконечно, особенно для больших таблиц.

Но я столкнулся с еще одной проблемой — поскольку я реплицировал данные с реплик, у меня было ограничение подключения в 30 секунд и некоторые таблицы не успевали материализоваться. Поэтому я решил перейти на материализованные представления. 

DBT не поддерживает материализованные представления из коробки, поэтому в наш проект необходимо добавить пакет materialized-views.

Идем в репозиторий и копируем папку materialized-views в нашу директорию. Добавим в каталог models новый transactions_view.sql:

transactions_view.sql
{{
   config(
       schema='row',
       materialized='materialized_view',
   )
}}


select * from foreign_tables.transactions

Добавляем файл tutorials/packages.yml:

packages.yml
packages:
 - local: ./materialized-views

А также создадим каталог tutorial/macros и добавим туда файл *.sql:

*.sql
{% macro drop_relation(relation) -%}
 {{ return(dbt_labs_materialized_views.drop_relation(relation)) }}
{% endmacro %}


{% macro postgres__list_relations_without_caching(schema_relation) %}
 {{ return(dbt_labs_materialized_views.postgres__list_relations_without_caching(schema_relation)) }}
{% endmacro %}


{% macro postgres_get_relations() %}
 {{ return(dbt_labs_materialized_views.postgres_get_relations()) }}
{% endmacro %}

Проверим, что наша структура проекта соответствует следующей:

tutorial/
├── dbt_project.yml
├── macros
│   └── *.sql
├── materialized-views
│   ├── dbt_project.yml
│   ├── integration_tests
│   ├── macros
│   └── README.md
├── models
│   ├── transactions.sql
│   └── transactions_view.sql
├── packages.yml
└── profiles.yml

Запустим нашу модель, предварительно установив пакет materialized-views:

dbt deps --project-dir . --profiles-dir .
dbt run --project-dir . --profiles-dir . -m transactions_view

Отлично, наше представление материализовалось!

Повторный запуск команды будет аналогичен выполнению команды REFRESH:

Теперь мы можем интегрировать наш dbt проект в какой-нибудь оркестратор ELT пайплайнов, например, Dagster или AirFlow,чтобы запускать обновление наших материализаций по расписанию и постороить поверх этих материализованных представлений другие таблицы или инкрементные материализации.

Решение не идеальное, но для меня пока работает) Буду рад советам)

Tags:
Hubs:
Total votes 1: ↑1 and ↓0+1
Comments0

Articles