CDC на примитивах
Суть CDC в том, что он «захватывает» все изменения в базе данных.
Существует два вида CDC:
На основании запросов
На основании журнала
Если говорить про захват изменений на основании запросов, то есть следующие минусы:
Не самый лучший вариант, потому что CDC будет по крону обращаться к БД и получать изменения; большая нагрузка на БД.
Нельзя увидеть удаленные и/или изменённые строки в таблице.
Не реализуемо, если в таблице нет вообще никаких дат и поэтому мы не сможем захватить изменения.
И следующие плюсы:
Быстрая реализация.
Обычно не требует специфических знаний и поэтому реализация не будет сложной.
И при сравнении с захватом изменений через журнал транзакций у нас следующая картина по минусам:
Сложно реализуемо
Необходима бОльшая инфраструктура для реализации данного вида CDC
Сложнее в поддерживании
Но в тоже время мы получаем следующие плюсы:
Мы можем отслеживать Все изменения в нашей БД: добавление, удаление и изменение записей
Это не нагружает БД, потому что мы читаем WAL-журнал
Можно реализовать real-time аналитику
Можно реализовать event-driven подход
Ниже будет рассказано о том, как реализовать CDC при использовании Debezium. Если вы не хотите повторять все операции самостоятельно, то можете воспользоваться моим репозиторием, в котором есть всё что используется в статье. А статью можете использовать как справочник.
Для начала создадим docker-compose.yaml
при помощи которого сделаем всю инфраструктуру.
docker-compose.yaml
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.5
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
kafka:
image: quay.io/debezium/kafka:2.5
ports:
- "9092:9092"
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:2.5
ports:
- "5432:5432"
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
connect:
image: quay.io/debezium/connect:2.5
ports:
- "8083:8083"
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
control-center:
image: confluentinc/cp-enterprise-control-center:7.6.1
hostname: control-center
ports:
- "9021:9021"
depends_on:
- kafka
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: "kafka:9092"
CONTROL_CENTER_REPLICATION_FACTOR: "1"
debezium-ui:
image: quay.io/debezium/debezium-ui:2.5
platform: linux/x86_64
depends_on:
- kafka
- connect
- zookeeper
ports:
- "8081:8080"
environment:
- KAFKA_CONNECT_URIS=http://connect:8083
ch_server:
image: clickhouse/clickhouse-server:24.4.1
ports:
- "8123:8123"
environment:
CLICKHOUSE_USER: click
CLICKHOUSE_PASSWORD: click
Важно. Используемые образы в docker-compose.yaml
— это образы от самого debezium. Поэтому часть шагов и инструкций уже были выполнены за вас. Но если вы хотите собрать всё самостоятельно, то воспользуйтесь документацией.
Затем создадим config register-postgres.json
register-postgres.json
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"topic.prefix": "dbserver1",
"schema.include.list": "inventory"
}
}
После этого выполним запрос:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
Если перейти по адресу http://localhost:9021/ и зайти в Topics
, то можно увидеть топики с указанием наших таблиц из БД.
Начиная с этого момента работает CDC. Все изменения, которые будут происходить в БД будут отправляться в Kafka.
Давайте для примера возьмем таблицу с пользователями и сэмулируем «рабочую» нагрузку.
Для этого я предлагаю использовать небольшой скрипт, который будет генерировать новые записи и изменять старые.
Но перед тем как написать скрипт нужно подготовить окружение; для этого выполните следующую команду:
python3.11 -m venv venv && \
source venv/bin/activate && \
pip install --upgrade pip && \
pip install -r requirements.txt
Создайте файл table_simulation.py
и в него поместите следующий код:
table_simulation.py
import uuid
import json
import time
import random
import requests
import pandas as pd
from connectors_to_databases import PostgreSQL
pg = PostgreSQL(
host='localhost',
port=5432,
login='postgres',
password='postgres',
)
def insert_user():
r = requests.get(url='https://randomuser.me/api/')
d = json.loads(r.text)
dict_user = d['results'][0]
d_ = {
'first_name': [dict_user['name']['first']],
'last_name': [dict_user['name']['last']],
'email': [dict_user['email']],
}
df = pd.DataFrame(d_)
try:
pg.insert_df(
df=df,
pg_table_schema='inventory',
pg_table_name='customers',
)
except Exception as ex:
pass
time.sleep(2)
def update_user():
df_id = pg.execute_to_df(
'''
SELECT id FROM inventory.customers
'''
)
len_list = len(df_id.id)
random_user = random.randint(0, len_list)
try:
pg.execute_script(
f'''
UPDATE
inventory.customers
SET
first_name = '{str(uuid.uuid4())}',
last_name = '{str(uuid.uuid4())}'
WHERE
id = {df_id.id[random_user]}
'''
)
except Exception as ex:
pass
time.sleep(2)
while True:
if random.randint(1, 1000) % 2 == 0:
print('INSERT new USER')
insert_user()
else:
print('UPDATE current USER')
update_user()
Запускаем наше виртуальное окружение
source venv/bin/activate
Запускам скрипт:
python table_simulation.py
После запуска скрипта необходимо перейти по адресу http://localhost:9021/ и зайти в топик dbserver1.inventory.customers
Там мы будем видеть все сообщения, которые поступают в Kafka и это как раз все наши изменения из WAL-журнала.
Ниже формат сообщения, которое приходит к нам в Kafka
Message
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope",
"version": 1
},
"payload": {
"before": {
"id": 1003,
"first_name": "Edward",
"last_name": "Walker",
"email": "ed@walker.com"
},
"after": {
"id": 1003,
"first_name": "2df7fb35-4c33-4742-9e56-10073c54937e",
"last_name": "07f6a1e6-6a0a-4d51-98c3-9e67ea49b3b5",
"email": "ed@walker.com"
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1715153932452,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"34498256\",\"34498256\"]",
"schema": "inventory",
"table": "customers",
"txId": 779,
"lsn": 34498256,
"xmin": null
},
"op": "u",
"ts_ms": 1715153932842,
"transaction": null
}
}
Важная здесь информация, которая позволяет нам отслеживать изменения — это вот этот кусок сообщения:
...
"payload": {
"before": {
"id": 1003,
"first_name": "Edward",
"last_name": "Walker",
"email": "ed@walker.com"
},
"after": {
"id": 1003,
"first_name": "2df7fb35-4c33-4742-9e56-10073c54937e",
"last_name": "07f6a1e6-6a0a-4d51-98c3-9e67ea49b3b5",
"email": "ed@walker.com"
},
"source": {
"version": "2.5.4.Final",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1715153932452,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"34498256\",\"34498256\"]",
"schema": "inventory",
"table": "customers",
"txId": 779,
"lsn": 34498256,
"xmin": null
},
...
Данное сообщение нам показывает что было в БД до и что стало после изменения. Формат сообщения позволяет нам создавать SCD2 и отслеживать все изменения по ключу.
Всю обработку потока можно реализовать разными способами: Spark, Flink, etc
Но я покажу как просто и быстро можно прочитать топики Kafka через ClickHouse.
Для этого я ранее уже добавил ClickHouse в наш docker-compose.yaml
.
После этого необходимо подключиться к ClickHouse через любой менеджер подключений или через cli, я буду подключаться через DBeaver.
Если вы захотите углубиться в тему kafka-engine в ClickHouse, то можно воспользоваться официальной документацией.
Ниже я покажу, как подключить топик dbserver1.inventory.customers
к ClickHouse
Для начала нам необходимо создать consumer для Kafka:
DROP TABLE IF EXISTS customers_kafka;
CREATE TABLE customers_kafka
(
payload String,
"schema" String
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka',
kafka_topic_list = 'dbserver1.inventory.customers',
kafka_group_name = 'foo',
kafka_format = 'JSON';
Затем создаем таблицу, в которую будут записываться данные из топика:
DROP TABLE IF EXISTS customers_kafka_mv;
CREATE TABLE customers_kafka_mv
(
payload String,
"schema" String
)
ENGINE = MergeTree()
ORDER BY (payload);
В конце создаем мат.представление, которое будет уже записывать наши записи из топика в таблицу:
DROP TABLE IF EXISTS mv_customers_kafka;
CREATE MATERIALIZED VIEW mv_customers_kafka
TO customers_kafka_mv AS
SELECT * FROM customers_kafkasql
И если запустить следующий скрипт:
SELECT count(*) FROM customers_kafka_mv
То мы увидим количество сообщений, которое было записано в данный топик Kafka. И также мы можем прочитать любое из сообщений.
Так как данные сохранятся в формате JSON, то для «разбора» полученных JSON нужно пользоваться встроенными методами в ClickHouse.
На этом всё. Реализация в статье — это просто один из примеров того, как это может выглядеть. При выборе технологий смотрите на потребности бизнеса, где есть компетенции и пр. В целом, вы можете и не использовать ClickHouse для чтения топиков, а написать свой сервис на Python или реализовать микро‑батчинг посредством Apache Airflow.
Резюме: Захват изменений можно использовать для разных целей и под каждую цель – свой инструмент.
Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.