Третья статья цикла о построении CDC-пайплайна с нуля. Сегодня — самое интересное: захватываем изменения из PostgreSQL и отправляем в Kafka. И разбираемся, почему WAL может съесть весь диск, даже если данные не меняются.
Зачем я это пишу
Честно: в первую очередь это мои заметки. На работе я поддерживаю CDC-пайплайны — 7 баз данных, сотни миллионов записей, Debezium, Kafka. Но там всё настроено до меня, и разбираться в чужой конфигурации — не то же самое, что понимать почему оно работает именно так.
Во вторую очередь — портфолио. Если буду менять работу, эти статьи можно приложить к резюме.
И в третью — мотивация не бросить. Если хотя бы десять человек прочитают и кому-то пригодится — значит не зря потратил выходные.
Что уже есть
В предыдущих статьях я поднял:
Budget Parser — Telegram-бот, который через Claude Vision API парсит скриншоты банковских транзакций. Помните YM*DEEPHOST -313.95₽ и Красное&Белое -440.83₽? Сейчас увидим их в Kafka.
PostgreSQL с logical replication — wal_level=logical, публикация, пользователь для репликации.
Сейчас будем ловить изменения из PostgreSQL и отправлять в Kafka.
Целевая архитектура

Четыре LXC-контейнера в Proxmox:
PostgreSQL: 192.168.0.151
Kafka + ZooKeeper: 192.168.0.153
Debezium (Kafka Connect): 192.168.0.154
Kafka UI: 192.168.0.160
Часть 1: Kafka + ZooKeeper
Kafka уже умеет работать без ZooKeeper (режим KRaft), но я выбрал классическую связку — больше документ��ции и примеров troubleshooting.
Контейнер и установка
# CT 303, 4GB RAM, 20GB disk, IP 192.168.0.153 apt update && apt install -y openjdk-17-jre-headless wget wget https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz tar -xzf kafka_2.13-3.9.0.tgz -C /opt/kafka/
4 ГБ RAM — минимум для Kafka + ZooKeeper. В production обычно 8-16 ГБ на брокер.
Конфигурация ZooKeeper
# /opt/kafka/config/zookeeper.properties dataDir=/var/lib/zookeeper clientPort=2181 maxClientCnxns=60 admin.enableServer=false
Конфигурация Kafka
# /opt/kafka/config/server.properties broker.id=0 listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://192.168.0.153:9092 zookeeper.connect=localhost:2181 log.dirs=/var/lib/kafka log.retention.hours=1
Грабля #1: advertised.listeners
Тут была моя первая ошибка, поскольку ранее я уже поднимал Kafka на другом адресе.
Как работает подключение:
Клиент подключается к bootstrap.servers
Kafka отвечает: «Вот список брокеров, подключайся по адресам из advertised.listeners»
Клиент использует эти адреса
Что было у меня:
advertised.listeners=PLAINTEXT://192.168.0.158:9092 # Старый IP!
Kafka слушала на правильном IP, но говорила клиентам подключаться к несуществующему адресу. Результат — Timed out waiting for a node assignment.
Правило: advertised.listeners = IP, доступный клиентам извне.
Systemd и проверка
# /etc/systemd/system/kafka.service [Service] Type=simple User=kafka Environment="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G" ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties Requires=zookeeper.service After=zookeeper.service # Проверка echo ruok | nc localhost 2181 # imok /opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Часть 2: Debezium и Kafka Connect
Что такое Kafka Connect?
Debezium — это не standalone-приложение, а плагин для Kafka Connect.
Kafka Connect — фреймворк для интеграций:
Source Connectors — читают откуда-то → пиш��т в Kafka
Sink Connectors — читают из Kafka → пишут куда-то
Debezium — Source Connector, который читает WAL базы данных.
Отдельный контейнер
# CT 304, 2GB RAM, 10GB disk, IP 192.168.0.154 apt install -y openjdk-17-jre-headless wget curl jq # Kafka (нужен для Connect) wget https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz tar -xzf kafka_2.13-3.9.0.tgz -C /opt/kafka/ # Debezium PostgreSQL Connector wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/3.0.0.Final/debezium-connector-postgres-3.0.0.Final-plugin.tar.gz mkdir -p /opt/kafka/plugins tar -xzf debezium-connector-postgres-*.tar.gz -C /opt/kafka/plugins/
Почему отдельно от Kafka? Изоляция. Можно рестартить коннектор, не трогая брокер.
Конфигурация Kafka Connect
# /opt/kafka/config/connect-distributed.properties bootstrap.servers=192.168.0.153:9092 group.id=debezium-connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false config.storage.topic=connect-configs offset.storage.topic=connect-offsets status.storage.topic=connect-status config.storage.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 rest.host.name=0.0.0.0 rest.port=8083 rest.advertised.host.name=192.168.0.154 plugin.path=/opt/kafka/plugins
Проверка
systemctl start kafka-connect curl -s http://localhost:8083/ | jq .version # "3.9.0" curl -s http://localhost:8083/connector-plugins | jq '.[].class' # "io.debezium.connector.postgresql.PostgresConnector"
Часть 3: Подключаем Debezium к PostgreSQL
Конфигурация коннектора
{ "name": "budget-postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "192.168.0.151", "database.port": "5432", "database.user": "debezium", "database.password": "debezium_pass", "database.dbname": "budget_db", "topic.prefix": "budget", "schema.include.list": "finance", "table.include.list": "finance.parsed_transactions", "publication.name": "finance_publication", "slot.name": "debezium_slot", "plugin.name": "pgoutput", "snapshot.mode": "initial", "decimal.handling.mode": "string", "heartbeat.interval.ms": "10000", "heartbeat.action.query": "UPDATE finance.debezium_heartbeat SET last_heartbeat = NOW() WHERE id = 1" } }
Ключевые параметры
plugin.name — плагин декодирования WAL:
Плагин | Описание |
pgoutput | Встроен в PostgreSQL 10+, рекомендуется |
wal2json | Нужно устанавливать отдельно, выдаёт JSON |
decoderbufs | Protobuf, компактнее, но сложнее в отладке |
Выбирайте pgoutput, если PostgreSQL ≥ 10.
snapshot.mode — что делать при первом запуске:
Режим | Поведение |
initial | Снять snapshot всех данных, потом слушать изменения |
never | Только новые изменения (данные уже есть в DWH) |
schema_only | Только схема, без данных |
Для большой таблицы snapshot может занять часы. Планируйте окно обслуживания.
decimal.handling.mode: string — как представлять DECIMAL/NUMERIC:
Режим | Результат | Комментарий |
precise (default) | "8Yw=" | Base64, точно но нечитаемо |
string | "-37.00" | Строка, читаемо |
double | -37.0 | Число с плавающей точкой (потеря точности!) |
По умолчанию Debezium кодирует в Base64 для максимальной точности. Consumer может декодировать эти данные, но string нагляднее для отладки и большинства задач. Я выбрал string.
Heartbeat — защита от переполнения диска
Это была неочевидная для меня вещь, но одна из самых важных.
Проблема: Если в таблице нет изменений, Debezium не продвигает offset. Replication slot держит старую позицию в WAL. PostgreSQL не может удалить старые WAL-файлы. Диск заканчивается.
Коннектор показывает RUNNING, всё вроде хорошо, а директория /var/lib/postgresql/16/main/pg_wal/ ра��тёт и растёт.
Решение: Heartbeat. Каждые 10 секунд Debezium делает UPDATE в служебную таблицу → получает событие из WAL → продвигает offset.
-- На PostgreSQL создаём таблицу для heartbeat
CREATE TABLE finance.debezium_heartbeat ( id INTEGER PRIMARY KEY, last_heartbeat TIMESTAMP ); INSERT INTO finance.debezium_heartbeat VALUES (1, NOW()); GRANT SELECT, UPDATE ON finance.debezium_heartbeat TO debezium; -- Добавляем в publication ALTER PUBLICATION finance_publication ADD TABLE finance.debezium_heartbeat;
Регистрация коннектора
curl -X POST \ -H "Content-Type: application/json" \ -d @postgres-connector.json \ http://localhost:8083/connectors # Проверяем статус curl -s http://localhost:8083/connectors/budget-postgres-connector/status | jq . { "name": "budget-postgres-connector", "connector": { "state": "RUNNING" }, "tasks": [{ "id": 0, "state": "RUNNING" }] }
Часть 4: Смотрим результат
Kafka UI

Топики: CDC-события, heartbeat, служебные топики Kafka Connect

Видно события с таймстемпами — каждая транзакция из Budget Parser

budget-postgres-connector: RUNNING, 1 of 1 tasks
Реальные данные в Kafka
Вот что пришло при первом запуске (snapshot существующих данных, op: "r"):
{ "before": null, "after": { "id": 4, "transaction_date": 20461, "merchant": "YM*DEEPHOST", "amount": "-313.95", "transaction_type": "expense", "category": "Прочие списания", "is_verified": true }, "op": "r", "source": { "connector": "postgresql", "db": "budget_db", "schema": "finance", "table": "parsed_transactions", "lsn": 27105320 } }
Та самая транзакция YM*DEEPHOST из Budget Parser! Теперь она в Kafka.
Типы операций
op | Значение | Когда |
r | read (snapshot) | Первоначальное чтение данных |
c | create | INSERT |
u | update | UPDATE |
d | delete | DELETE |
Важно: формат дат
В JSON вы увидите transaction_date: 20461 — это не ошибка. Debezium представляет DATE как количество дней с 1970-01-01 (epoch days). Consumer должен конвертировать:
from datetime import date, timedelta transaction_date = date(1970, 1, 1) + timedelta(days=20461) # 2026-01-08
Аналогично для TIMESTAMP — микросекунды с epoch.
Часть 5: Kafka UI
Смотреть события в консоли неудобно. Kafka UI даёт веб-интерфейс:
# CT 306, IP 192.168.0.160 wget https://github.com/provectus/kafka-ui/releases/download/v0.7.2/kafka-ui-api-v0.7.2.jar \ -O /opt/kafka-ui/kafka-ui.jar cat > /opt/kafka-ui/application.yml << 'EOF' server: port: 8080 kafka: clusters: - name: homelab bootstrapServers: 192.168.0.153:9092 kafkaConnect: - name: debezium address: http://192.168.0.154:8083 EOF
Открываем http://192.168.0.160:8080 — видим топики, сообщения, статус коннекторов.
Грабли, на которые можно наступить
1. advertised.listeners с неправильным IP
Симптом: Timed out waiting for a node assignment
Причина: Kafka говорит клиентам подключаться к несуществующему IP.
Решение: advertised.listeners = реальный IP, доступный клиентам.
2. Base64 вместо чисел в amount
Симптом: "amount": "8Yw=" вместо "amount": "-37.00"
Причина: Debezium по умолчанию кодирует DECIMAL в Base64.
Решение: "decimal.handling.mode": "string" в конфиге коннектора.
3. WAL растёт без изменений в таблице
Симптом: Директория /var/lib/postgresql/16/main/pg_wal/ занимает всё больше места, коннектор показывает RUNNING.
Причина: Нет heartbeat → Debezium не продвигает offset → replication slot держит WAL.
Решение: Настроить heartbeat.interval.ms и heartbeat.action.query.
4. Kafka Connect не видит плагин Debezium
Симптом: connector-plugins возвращает пустой список или нет PostgresConnector.
Причина: Неправильный plugin.path или плагин не распакован.
Решение: Проверить путь в connect-distributed.properties, убедиться что JAR-файлы на месте.
5. Kafka UI требует авторизацию
Симптом: Окно логина при открытии UI.
Причина: Дефолтная конфигурация с включённой security.
Решение:
spring: security: enabled: false auth: type: DISABLED
6. Java OutOfMemoryError
Симптом: Kafka или Connect падает с OOM.
Причина: Недостаточно heap памяти.
Решение: KAFKA_HEAP_OPTS=-Xmx1G -Xms1G для Kafka, аналогично для Connect. Не давайте JVM больше 50% RAM контейнера.
Итоговая архитектура
Контейнер | IP | RAM | Назначение |
budget-parser | 192.168.0.150 | 1GB | Telegram-бот |
pg-source | 192.168.0.151 | 2GB | PostgreSQL |
kafka | 192.168.0.153 | 4GB | Kafka + ZooKeeper |
debezium | 192.168.0.154 | 2GB | Kafka Connect |
monitoring | 192.168.0.160 | 2GB | Kafka UI |
Итого: 11 ГБ RAM на весь CDC-пайплайн.
Полезные команды
# Статус коннектора curl -s http://192.168.0.154:8083/connectors/budget-postgres-connector/status | jq . # Рестарт коннектора curl -X POST http://192.168.0.154:8083/connectors/budget-postgres-connector/restart # Пауза curl -X PUT http://192.168.0.154:8083/connectors/budget-postgres-connector/pause # Удаление curl -X DELETE http://192.168.0.154:8083/connectors/budget-postgres-connector # Replication slot в PostgreSQL sudo -u postgres psql -d budget_db -c "SELECT slot_name, active FROM pg_replication_slots;" # Читать события из топика /opt/kafka/bin/kafka-console-consumer.sh \ --topic budget.finance.parsed_transactions \ --bootstrap-server 192.168.0.153:9092 \ --from-beginning
Что дальше
Данные текут из PostgreSQL в Kafka. Пока они просто накапливаются в топике с retention 1 час и исчезают.
Следующий этап — написать Consumer и построить слои хранения:
Kafka → Consumer → Avro (сырой слой) → Parquet (промежуточный слой) → Hive
Там будет много интересного: вычитка из Kafka, преобразование форматов, инкрементальная загрузка, сравнение с тем что уже есть в хранилище.
# | Статья | Статус |
1 | ✅ | |
2 | ✅ | |
3 | Kafka + Debezium | ✅ Эта статья |
4 | Consumer + HDFS/Hive | ⏳ Следующая |
5 | Мониторинг в Grafana | 📋 Планируется |
Ссылки
Вопросы — в комментарии. И расскажите про свой опыт с CDC: кто ещё настраивает это для себя или в production? Какие грабли встречались? Интересно узнать, что я не один такой.