Третья статья цикла о построении CDC-пайплайна с нуля. Сегодня — самое интересное: захватываем изменения из PostgreSQL и отправляем в Kafka. И разбираемся, почему WAL может съесть весь диск, даже если данные не меняются.


Зачем я это пишу

Честно: в первую очередь это мои заметки. На работе я поддерживаю CDC-пайплайны — 7 баз данных, сотни миллионов записей, Debezium, Kafka. Но там всё настроено до меня, и разбираться в чужой конфигурации — не то же самое, что понимать почему оно работает именно так.

Во вторую очередь — портфолио. Если буду менять работу, эти статьи можно приложить к резюме.

И в третью — мотивация не бросить. Если хотя бы десять человек прочитают и кому-то пригодится — значит не зря потратил выходные.


Что уже есть

В предыдущих статьях я поднял:

  1. Budget Parser — Telegram-бот, который через Claude Vision API парсит скриншоты банковских транзакций. Помните YM*DEEPHOST -313.95₽ и Красное&Белое -440.83₽? Сейчас увидим их в Kafka.

  2. PostgreSQL с logical replication — wal_level=logical, публикация, пользователь для репликации.

Сейчас будем ловить изменения из PostgreSQL и отправлять в Kafka.


Целевая архитектура

Четыре LXC-контейнера в Proxmox:

  • PostgreSQL: 192.168.0.151

  • Kafka + ZooKeeper: 192.168.0.153

  • Debezium (Kafka Connect): 192.168.0.154

  • Kafka UI: 192.168.0.160


Часть 1: Kafka + ZooKeeper

Kafka уже умеет работать без ZooKeeper (режим KRaft), но я выбрал классическую связку — больше документ��ции и примеров troubleshooting.

Контейнер и установка

# CT 303, 4GB RAM, 20GB disk, IP 192.168.0.153

apt update && apt install -y openjdk-17-jre-headless wget

wget https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz

tar -xzf kafka_2.13-3.9.0.tgz -C /opt/kafka/

4 ГБ RAM — минимум для Kafka + ZooKeeper. В production обычно 8-16 ГБ на брокер.

Конфигурация ZooKeeper

# /opt/kafka/config/zookeeper.properties

dataDir=/var/lib/zookeeper

clientPort=2181

maxClientCnxns=60

admin.enableServer=false

Конфигурация Kafka

# /opt/kafka/config/server.properties

broker.id=0

listeners=PLAINTEXT://0.0.0.0:9092

advertised.listeners=PLAINTEXT://192.168.0.153:9092

zookeeper.connect=localhost:2181

log.dirs=/var/lib/kafka

log.retention.hours=1

Грабля #1: advertised.listeners

Тут была моя первая ошибка, поскольку ранее я уже поднимал Kafka на другом адресе. 

Как работает подключение:

  1. Клиент подключается к bootstrap.servers

  2. Kafka отвечает: «Вот список брокеров, подключайся по адресам из advertised.listeners»

  3. Клиент использует эти адреса

Что было у меня:

advertised.listeners=PLAINTEXT://192.168.0.158:9092  # Старый IP!

Kafka слушала на правильном IP, но говорила клиентам подключаться к несуществующему адресу. Результат — Timed out waiting for a node assignment.

Правило: advertised.listeners = IP, доступный клиентам извне.

Systemd и проверка

# /etc/systemd/system/kafka.service

[Service]

Type=simple

User=kafka

Environment="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G"

ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

Requires=zookeeper.service

After=zookeeper.service

# Проверка

echo ruok | nc localhost 2181  # imok

/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Часть 2: Debezium и Kafka Connect

Что такое Kafka Connect?

Debezium — это не standalone-приложение, а плагин для Kafka Connect.

Kafka Connect — фреймворк для интеграций:

  • Source Connectors — читают откуда-то → пиш��т в Kafka

  • Sink Connectors — читают из Kafka → пишут куда-то

Debezium — Source Connector, который читает WAL базы данных.

Отдельный контейнер

# CT 304, 2GB RAM, 10GB disk, IP 192.168.0.154

apt install -y openjdk-17-jre-headless wget curl jq

# Kafka (нужен для Connect)

wget https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz

tar -xzf kafka_2.13-3.9.0.tgz -C /opt/kafka/

# Debezium PostgreSQL Connector

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/3.0.0.Final/debezium-connector-postgres-3.0.0.Final-plugin.tar.gz

mkdir -p /opt/kafka/plugins

tar -xzf debezium-connector-postgres-*.tar.gz -C /opt/kafka/plugins/

Почему отдельно от Kafka? Изоляция. Можно рестартить коннектор, не трогая брокер.

Конфигурация Kafka Connect

# /opt/kafka/config/connect-distributed.properties

bootstrap.servers=192.168.0.153:9092

group.id=debezium-connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false

value.converter=org.apache.kafka.connect.json.JsonConverter

value.converter.schemas.enable=false

config.storage.topic=connect-configs

offset.storage.topic=connect-offsets

status.storage.topic=connect-status

config.storage.replication.factor=1

offset.storage.replication.factor=1

status.storage.replication.factor=1

rest.host.name=0.0.0.0

rest.port=8083

rest.advertised.host.name=192.168.0.154

plugin.path=/opt/kafka/plugins

Проверка

systemctl start kafka-connect

curl -s http://localhost:8083/ | jq .version

# "3.9.0"

curl -s http://localhost:8083/connector-plugins | jq '.[].class'

# "io.debezium.connector.postgresql.PostgresConnector"

Часть 3: Подключаем Debezium к PostgreSQL

Конфигурация коннектора

{

  "name": "budget-postgres-connector",

  "config": {

    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",

    "tasks.max": "1",

    

    "database.hostname": "192.168.0.151",

    "database.port": "5432",

    "database.user": "debezium",

    "database.password": "debezium_pass",

    "database.dbname": "budget_db",

    

    "topic.prefix": "budget",

    "schema.include.list": "finance",

    "table.include.list": "finance.parsed_transactions",

    

    "publication.name": "finance_publication",

    "slot.name": "debezium_slot",

    "plugin.name": "pgoutput",

    

    "snapshot.mode": "initial",

    "decimal.handling.mode": "string",

    

    "heartbeat.interval.ms": "10000",

    "heartbeat.action.query": "UPDATE finance.debezium_heartbeat SET last_heartbeat = NOW() WHERE id = 1"

  }

}

Ключевые параметры

plugin.name — плагин декодирования WAL:

Плагин

Описание

pgoutput

Встроен в PostgreSQL 10+, рекомендуется

wal2json

Нужно устанавливать отдельно, выдаёт JSON

decoderbufs

Protobuf, компактнее, но сложнее в отладке

Выбирайте pgoutput, если PostgreSQL ≥ 10.

snapshot.mode — что делать при первом запуске:

Режим

Поведение

initial

Снять snapshot всех данных, потом слушать изменения

never

Только новые изменения (данные уже есть в DWH)

schema_only

Только схема, без данных

Для большой таблицы snapshot может занять часы. Планируйте окно обслуживания.

decimal.handling.mode: string — как представлять DECIMAL/NUMERIC:

Режим

Результат

Комментарий

precise (default)

"8Yw="

Base64, точно но нечитаемо

string

"-37.00"

Строка, читаемо

double

-37.0

Число с плавающей точкой (потеря точности!)

По умолчанию Debezium кодирует в Base64 для максимальной точности. Consumer может декодировать эти данные, но string нагляднее для отладки и большинства задач. Я выбрал string.

Heartbeat — защита от переполнения диска

Это была неочевидная для меня вещь, но одна из самых важных.

Проблема: Если в таблице нет изменений, Debezium не продвигает offset. Replication slot держит старую позицию в WAL. PostgreSQL не может удалить старые WAL-файлы. Диск заканчивается.

Коннектор показывает RUNNING, всё вроде хорошо, а директория /var/lib/postgresql/16/main/pg_wal/ ра��тёт и растёт.

Решение: Heartbeat. Каждые 10 секунд Debezium делает UPDATE в служебную таблицу → получает событие из WAL → продвигает offset.

-- На PostgreSQL создаём таблицу для heartbeat

CREATE TABLE finance.debezium_heartbeat (

    id INTEGER PRIMARY KEY,

    last_heartbeat TIMESTAMP

);

INSERT INTO finance.debezium_heartbeat VALUES (1, NOW());

GRANT SELECT, UPDATE ON finance.debezium_heartbeat TO debezium;

-- Добавляем в publication

ALTER PUBLICATION finance_publication ADD TABLE finance.debezium_heartbeat;

Регистрация коннектора

curl -X POST \

  -H "Content-Type: application/json" \

  -d @postgres-connector.json \

  http://localhost:8083/connectors

# Проверяем статус

curl -s http://localhost:8083/connectors/budget-postgres-connector/status | jq .

{

  "name": "budget-postgres-connector",

  "connector": { "state": "RUNNING" },

  "tasks": [{ "id": 0, "state": "RUNNING" }]

}

Часть 4: Смотрим результат

Kafka UI

Топики: CDC-события, heartbeat, служебные топики Kafka Connect

Видно события с таймстемпами — каждая транзакция из Budget Parser

budget-postgres-connector: RUNNING, 1 of 1 tasks

Реальные данные в Kafka

Вот что пришло при первом запуске (snapshot существующих данных, op: "r"):

{

  "before": null,

  "after": {

    "id": 4,

    "transaction_date": 20461,

    "merchant": "YM*DEEPHOST",

    "amount": "-313.95",

    "transaction_type": "expense",

    "category": "Прочие списания",

    "is_verified": true

  },

  "op": "r",

  "source": {

    "connector": "postgresql",

    "db": "budget_db",

    "schema": "finance",

    "table": "parsed_transactions",

    "lsn": 27105320

  }

}

Та самая транзакция YM*DEEPHOST из Budget Parser! Теперь она в Kafka.

Типы операций

op

Значение

Когда

r

read (snapshot)

Первоначальное чтение данных

c

create

INSERT

u

update

UPDATE

d

delete

DELETE

Важно: формат дат

В JSON вы увидите transaction_date: 20461 — это не ошибка. Debezium представляет DATE как количество дней с 1970-01-01 (epoch days). Consumer должен конвертировать:

from datetime import date, timedelta

transaction_date = date(1970, 1, 1) + timedelta(days=20461)

# 2026-01-08

Аналогично для TIMESTAMP — микросекунды с epoch.


Часть 5: Kafka UI

Смотреть события в консоли неудобно. Kafka UI даёт веб-интерфейс:

# CT 306, IP 192.168.0.160

wget https://github.com/provectus/kafka-ui/releases/download/v0.7.2/kafka-ui-api-v0.7.2.jar \

  -O /opt/kafka-ui/kafka-ui.jar

cat > /opt/kafka-ui/application.yml << 'EOF'

server:

  port: 8080

kafka:

  clusters:

    - name: homelab

      bootstrapServers: 192.168.0.153:9092

      kafkaConnect:

        - name: debezium

          address: http://192.168.0.154:8083

EOF

Открываем http://192.168.0.160:8080 — видим топики, сообщения, статус коннекторов.


Грабли, на которые можно наступить

1. advertised.listeners с неправильным IP

Симптом: Timed out waiting for a node assignment

Причина: Kafka говорит клиентам подключаться к несуществующему IP.

Решение: advertised.listeners = реальный IP, доступный клиентам.

2. Base64 вместо чисел в amount

Симптом: "amount": "8Yw=" вместо "amount": "-37.00"

Причина: Debezium по умолчанию кодирует DECIMAL в Base64.

Решение: "decimal.handling.mode": "string" в конфиге коннектора.

3. WAL растёт без изменений в таблице

Симптом: Директория /var/lib/postgresql/16/main/pg_wal/ занимает всё больше места, коннектор показывает RUNNING.

Причина: Нет heartbeat → Debezium не продвигает offset → replication slot держит WAL.

Решение: Настроить heartbeat.interval.ms и heartbeat.action.query.

4. Kafka Connect не видит плагин Debezium

Симптом: connector-plugins возвращает пустой список или нет PostgresConnector.

Причина: Неправильный plugin.path или плагин не распакован.

Решение: Проверить путь в connect-distributed.properties, убедиться что JAR-файлы на месте.

5. Kafka UI требует авторизацию

Симптом: Окно логина при открытии UI.

Причина: Дефолтная конфигурация с включённой security.

Решение:

spring:

  security:

    enabled: false

auth:

  type: DISABLED

6. Java OutOfMemoryError

Симптом: Kafka или Connect падает с OOM.

Причина: Недостаточно heap памяти.

Решение: KAFKA_HEAP_OPTS=-Xmx1G -Xms1G для Kafka, аналогично для Connect. Не давайте JVM больше 50% RAM контейнера.


Итоговая архитектура

Контейнер

IP

RAM

Назначение

budget-parser

192.168.0.150

1GB

Telegram-бот

pg-source

192.168.0.151

2GB

PostgreSQL

kafka

192.168.0.153

4GB

Kafka + ZooKeeper

debezium

192.168.0.154

2GB

Kafka Connect

monitoring

192.168.0.160

2GB

Kafka UI

Итого: 11 ГБ RAM на весь CDC-пайплайн.


Полезные команды

# Статус коннектора

curl -s http://192.168.0.154:8083/connectors/budget-postgres-connector/status | jq .

# Рестарт коннектора

curl -X POST http://192.168.0.154:8083/connectors/budget-postgres-connector/restart

# Пауза

curl -X PUT http://192.168.0.154:8083/connectors/budget-postgres-connector/pause

# Удаление

curl -X DELETE http://192.168.0.154:8083/connectors/budget-postgres-connector

# Replication slot в PostgreSQL

sudo -u postgres psql -d budget_db -c "SELECT slot_name, active FROM pg_replication_slots;"

# Читать события из топика

/opt/kafka/bin/kafka-console-consumer.sh \

  --topic budget.finance.parsed_transactions \

  --bootstrap-server 192.168.0.153:9092 \

  --from-beginning

Что дальше

Данные текут из PostgreSQL в Kafka. Пока они просто накапливаются в топике с retention 1 час и исчезают.

Следующий этап — написать Consumer и построить слои хранения:

Kafka → Consumer → Avro (сырой слой) → Parquet (промежуточный слой) → Hive

Там будет много интересного: вычитка из Kafka, преобразование форматов, инкрементальная загрузка, сравнение с тем что уже есть в хранилище.

#

Статья

Статус

1

Budget Parser: Telegram + Claude Vision

2

PostgreSQL для CDC

3

Kafka + Debezium

✅ Эта статья

4

Consumer + HDFS/Hive

⏳ Следующая

5

Мониторинг в Grafana

📋 Планируется


Ссылки


Вопросы — в комментарии. И расскажите про свой опыт с CDC: кто ещё настраивает это для себя или в production? Какие грабли встречались? Интересно узнать, что я не один такой.