Суть CDC в том, что он «захватывает» все изменения в базе данных.
Существует два вида CDC:
На основании запросов
На основании журнала
Если говорить про захват изменений на основании запросов, то есть следующие минусы:
Не самый лучший вариант, потому что CDC будет по крону обращаться к БД и получать изменения; большая нагрузка на БД.
Нельзя увидеть удаленные и/или изменённые строки в таблице.
Не реализуемо, если в таблице нет вообще никаких дат и поэтому мы не сможем захватить изменения.
И следующие плюсы:
Быстрая реализация.
Обычно не требует специфических знаний и поэтому реализация не будет сложной.
И при сравнении с захватом изменений через журнал транзакций у нас следующая картина по минусам:
Сложно реализуемо
Необходима бОльшая инфраструктура для реализации данного вида CDC
Сложнее в поддерживании
Но в тоже время мы получаем следующие плюсы:
Мы можем отслеживать Все изменения в нашей БД: добавление, удаление и изменение записей
Это не нагружает БД, потому что мы читаем WAL-журнал
Можно реализовать real-time аналитику
Можно реализовать event-driven подход
Ниже будет рассказано о том, как реализовать CDC при использовании Debezium. Если вы не хотите повторять все операции самостоятельно, то можете воспользоваться моим репозиторием, в котором есть всё что используется в статье. А статью можете использовать как справочник.
Для начала создадим docker-compose.yaml при помощи которого сделаем всю инфраструктуру.
docker-compose.yaml
version: '2' services: zookeeper: image: quay.io/debezium/zookeeper:2.5 ports: - "2181:2181" - "2888:2888" - "3888:3888" kafka: image: quay.io/debezium/kafka:2.5 ports: - "9092:9092" links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 postgres: image: quay.io/debezium/example-postgres:2.5 ports: - "5432:5432" environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres connect: image: quay.io/debezium/connect:2.5 ports: - "8083:8083" links: - kafka - postgres environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses control-center: image: confluentinc/cp-enterprise-control-center:7.6.1 hostname: control-center ports: - "9021:9021" depends_on: - kafka environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: "kafka:9092" CONTROL_CENTER_REPLICATION_FACTOR: "1" debezium-ui: image: quay.io/debezium/debezium-ui:2.5 platform: linux/x86_64 depends_on: - kafka - connect - zookeeper ports: - "8081:8080" environment: - KAFKA_CONNECT_URIS=http://connect:8083 ch_server: image: clickhouse/clickhouse-server:24.4.1 ports: - "8123:8123" environment: CLICKHOUSE_USER: click CLICKHOUSE_PASSWORD: click
Важно. Используемые образы в docker-compose.yaml — это образы от самого debezium. Поэтому часть шагов и инструкций уже были выполнены за вас. Но если вы хотите собрать всё самостоятельно, то воспользуйтесь документацией.
Затем создадим config register-postgres.json
register-postgres.json
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", "topic.prefix": "dbserver1", "schema.include.list": "inventory" } }
После этого выполним запрос:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
Если перейти по адресу http://localhost:9021/ и зайти в Topics, то можно увидеть топики с указанием наших таблиц из БД.

Начиная с этого момента работает CDC. Все изменения, которые будут происходить в БД будут отправляться в Kafka.
Давайте для примера возьмем таблицу с пользователями и сэмулируем «рабочую» нагрузку.
Для этого я предлагаю использовать небольшой скрипт, который будет генерировать новые записи и изменять старые.
Но перед тем как написать скрипт нужно подготовить окружение; для этого выполните следующую команду:
python3.11 -m venv venv && \ source venv/bin/activate && \ pip install --upgrade pip && \ pip install -r requirements.txt
Создайте файл table_simulation.py и в него поместите следующий код:
table_simulation.py
import uuid import json import time import random import requests import pandas as pd from connectors_to_databases import PostgreSQL pg = PostgreSQL( host='localhost', port=5432, login='postgres', password='postgres', ) def insert_user(): r = requests.get(url='https://randomuser.me/api/') d = json.loads(r.text) dict_user = d['results'][0] d_ = { 'first_name': [dict_user['name']['first']], 'last_name': [dict_user['name']['last']], 'email': [dict_user['email']], } df = pd.DataFrame(d_) try: pg.insert_df( df=df, pg_table_schema='inventory', pg_table_name='customers', ) except Exception as ex: pass time.sleep(2) def update_user(): df_id = pg.execute_to_df( ''' SELECT id FROM inventory.customers ''' ) len_list = len(df_id.id) random_user = random.randint(0, len_list) try: pg.execute_script( f''' UPDATE inventory.customers SET first_name = '{str(uuid.uuid4())}', last_name = '{str(uuid.uuid4())}' WHERE id = {df_id.id[random_user]} ''' ) except Exception as ex: pass time.sleep(2) while True: if random.randint(1, 1000) % 2 == 0: print('INSERT new USER') insert_user() else: print('UPDATE current USER') update_user()
Запускаем наше виртуальное окружение
source venv/bin/activate
Запускам скрипт:
python table_simulation.py
После запуска скрипта необходимо перейти по адресу http://localhost:9021/ и зайти в топик dbserver1.inventory.customers
Там мы будем видеть все сообщения, которые поступают в Kafka и это как раз все наши изменения из WAL-журнала.
Ниже формат сообщения, которое приходит к нам в Kafka
Message
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "default": 0, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "default": 0, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false,incremental" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "sequence" }, { "type": "string", "optional": false, "field": "schema" }, { "type": "string", "optional": false, "field": "table" }, { "type": "int64", "optional": true, "field": "txId" }, { "type": "int64", "optional": true, "field": "lsn" }, { "type": "int64", "optional": true, "field": "xmin" } ], "optional": false, "name": "io.debezium.connector.postgresql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "name": "event.block", "version": 1, "field": "transaction" } ], "optional": false, "name": "dbserver1.inventory.customers.Envelope", "version": 1 }, "payload": { "before": { "id": 1003, "first_name": "Edward", "last_name": "Walker", "email": "ed@walker.com" }, "after": { "id": 1003, "first_name": "2df7fb35-4c33-4742-9e56-10073c54937e", "last_name": "07f6a1e6-6a0a-4d51-98c3-9e67ea49b3b5", "email": "ed@walker.com" }, "source": { "version": "2.5.4.Final", "connector": "postgresql", "name": "dbserver1", "ts_ms": 1715153932452, "snapshot": "false", "db": "postgres", "sequence": "[\"34498256\",\"34498256\"]", "schema": "inventory", "table": "customers", "txId": 779, "lsn": 34498256, "xmin": null }, "op": "u", "ts_ms": 1715153932842, "transaction": null } }
Важная здесь информация, которая позволяет нам отслеживать изменения — это вот этот кусок сообщения:
... "payload": { "before": { "id": 1003, "first_name": "Edward", "last_name": "Walker", "email": "ed@walker.com" }, "after": { "id": 1003, "first_name": "2df7fb35-4c33-4742-9e56-10073c54937e", "last_name": "07f6a1e6-6a0a-4d51-98c3-9e67ea49b3b5", "email": "ed@walker.com" }, "source": { "version": "2.5.4.Final", "connector": "postgresql", "name": "dbserver1", "ts_ms": 1715153932452, "snapshot": "false", "db": "postgres", "sequence": "[\"34498256\",\"34498256\"]", "schema": "inventory", "table": "customers", "txId": 779, "lsn": 34498256, "xmin": null }, ...
Данное сообщение нам показывает что было в БД до и что стало после изменения. Формат сообщения позволяет нам создавать SCD2 и отслеживать все изменения по ключу.
Всю обработку потока можно реализовать разными способами: Spark, Flink, etc
Но я покажу как просто и быстро можно прочитать топики Kafka через ClickHouse.
Для этого я ранее уже добавил ClickHouse в наш docker-compose.yaml.
После этого необходимо подключиться к ClickHouse через любой менеджер подключений или через cli, я буду подключаться через DBeaver.
Если вы захотите углубиться в тему kafka-engine в ClickHouse, то можно воспользоваться официальной документацией.
Ниже я покажу, как подключить топик dbserver1.inventory.customers к ClickHouse
Для начала нам необходимо создать consumer для Kafka:
DROP TABLE IF EXISTS customers_kafka; CREATE TABLE customers_kafka ( payload String, "schema" String ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka', kafka_topic_list = 'dbserver1.inventory.customers', kafka_group_name = 'foo', kafka_format = 'JSON';
Затем создаем таблицу, в которую будут записываться данные из топика:
DROP TABLE IF EXISTS customers_kafka_mv; CREATE TABLE customers_kafka_mv ( payload String, "schema" String ) ENGINE = MergeTree() ORDER BY (payload);
В конце создаем мат.представление, которое будет уже записывать наши записи из топика в таблицу:
DROP TABLE IF EXISTS mv_customers_kafka; CREATE MATERIALIZED VIEW mv_customers_kafka TO customers_kafka_mv AS SELECT * FROM customers_kafkasql
И если запустить следующий скрипт:
SELECT count(*) FROM customers_kafka_mv
То мы увидим количество сообщений, которое было записано в данный топик Kafka. И также мы можем прочитать любое из сообщений.
Так как данные сохранятся в формате JSON, то для «разбора» полученных JSON нужно пользоваться встроенными методами в ClickHouse.
На этом всё. Реализация в статье — это просто один из примеров того, как это может выглядеть. При выборе технологий смотрите на потребности бизнеса, где есть компетенции и пр. В целом, вы можете и не использовать ClickHouse для чтения топиков, а написать свой сервис на Python или реализовать микро‑батчинг посредством Apache Airflow.
Резюме: Захват изменений можно использовать для разных целей и под каждую цель – свой инструмент.
Также если вам необходима консультация/менторство/мок-собеседование и другие вопросы по дата-инженерии, то вы можете обращаться ко мне. Все контакты указаны по ссылке.
