Шестая статья цикла о построении CDC-пайплайна с нуля. Данные уже текут из PostgreSQL в Kafka, но дальше просто исчезают по retention. Сегодня пишем Consumer на Python, реализуем криптографическую верификацию сообщений и строим трёхслойную архитектуру данных.
Зачем я это пишу
Честно: в первую очередь это мои заметки. На работе я поддерживаю CDC-репликацию в банке, и одна деталь давно привлекала внимание: Debezium подписывает каждое сообщение, а Consumer проверяет подпись перед обработкой.
Зачем подписывать — понятно, безопасность. Но как именно это реализовано? Какой алгоритм? Что подписываем — весь JSON или только часть? Как Consumer проверяет? На эти вопросы документация отвечала размыто, а спросить у тех, кто настраивал, не получалось.
Во вторую очередь — портфолио. Кастомный SMT для Debezium я написал с помощью Claude и документации. Java — не мой язык, и без подсказок я бы не справился. Но сам факт, что получилось собрать и запустить, уже полезный опыт.
И в третью — мотивация. Это предпоследняя статья цикла, пайплайн почти готов.
Что уже есть
В предыдущих статьях я поднял:
Budget Parser — Telegram-бот, парсит банковские скриншоты через Claude Vision
PostgreSQL с logical replication — источник данных
Kafka + Debezium — захват изменений из WAL
HDFS + Hive — хранилище с SQL-доступом
Hue — веб-интерфейс для Hadoop
Данные текут от скриншота до Kafka, но дальше просто накапливаются в топике и исчезают по retention. Пора довести их до HDFS и замкнуть цепочку.
Целевая архитектура

Три слоя данных — это не прихоть, а необходимость:
Слой | Формат | Назначение |
Landing | Avro | Сырые данные как есть из Kafka. Если что-то пойдёт не так на следующих этапах, можно перезапустить обработку отсюда |
Bronze | Parquet | Те же данные в колоночном формате. Parquet эффективнее для аналитических запросов |
Silver | Parquet | Актуальное состояние. CDC-события (INSERT/UPDATE/DELETE) применены, видим текущую картину |
Часть 1: Криптографическая подпись сообщений
Зачем подписывать CDC-сообщения
Подпись гарантирует три вещи. Во-первых, сообщение отправлено именно Debezium, а не другим producer'ом. Во-вторых, сообщение не изменено в пути, то есть целостность данных сохранена. В-третьих, сообщение актуально, потому что timestamp защищает от повторной обработки старых сообщений.
В банковской среде это требование безопасности: данные должны быть верифицируемы на каждом этапе.
Custom SMT для Debezium
Kafka Connect поддерживает SMT (Single Message Transform) — плагины, которые обрабатывают каждое сообщение перед отправкой в топик. Готового SMT для подписи нет, пришлось писать свой на Java.
Java — не мой основной язык, поэтому разбирался с Kafka Connect API параллельно с написанием кода. Много проб и ошибок, но в итоге получилось.
Структура проекта:
debezium/smt-signature/ ├── pom.xml └── src/main/java/com/homelab/kafka/smt/ └── SignMessage.java
Ключевая логика SignMessage.java:
@Override public R apply(R record) { if (record.value() == null) { return record; // Tombstone — пропускаем } // Извлекаем стабильные поля из Debezium envelope String op = extractField(record, "op"); String tsMs = extractField(record, "ts_ms"); String lsn = extractSourceField(record, "lsn"); String id = extractAfterField(record, "id"); // Формируем payload для подписи String signaturePayload = String.format("%s|%s|%s|%s", op, tsMs, lsn, id); long timestamp = System.currentTimeMillis(); String messageToSign = signaturePayload + "|" + timestamp; String signature = calculateHmacSha256(messageToSign, secretKey); // Добавляем в headers Headers headers = record.headers(); headers.addString("X-Signature", signature); headers.addString("X-Signature-Timestamp", String.valueOf(timestamp)); headers.addString("X-Signature-Payload", signaturePayload); return record; }
Почему подписываем не весь JSON, а только поля?
Это был первый грабель. Изначально я пытался подписывать record.value().toString(), но Java отдаёт Struct-представление, которое отличается от финального JSON в Kafka. Подписи не сходились.
Решение — подписывать только стабильные поля из Debezium envelope:
Поле | Назначение |
op | Тип операции (c/u/d/r) |
ts_ms | Timestamp события |
lsn | Log Sequence Number из PostgreSQL |
id | Primary key записи |
Эти поля не меняются при сериализации и однозначно идентифицируют событие.
Сборка и установка:
# На контейнере debezium (192.168.0.154) cd /opt/smt-signature mvn clean package # Копируем JAR в плагины Kafka Connect cp target/smt-signature-1.0.0.jar /opt/kafka/connect-plugins/ # Перезапускаем Kafka Connect systemctl restart kafka-connect
Добавление SMT в конфигурацию коннектора:
{ "name": "budget-postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "192.168.0.151", "database.port": "5432", "database.user": "debezium", "database.password": "<debezium-password>", "database.dbname": "budget_db", "topic.prefix": "budget", "table.include.list": "finance.parsed_transactions", "plugin.name": "pgoutput", "slot.name": "debezium_slot", "publication.name": "finance_publication", "heartbeat.interval.ms": "10000", "decimal.handling.mode": "string", "transforms": "sign", "transforms.sign.type": "com.homelab.kafka.smt.SignMessage", "transforms.sign.secret.key": "<base64-secret-key>" } }
Секретный ключ сгенерирован через openssl rand -base64 32. В production он должен храниться в Vault или другом секрет-менеджере, а не в конфиге коннектора.
Часть 2: Consumer на Python
Создание контейнера
Параметр | Значение |
CT ID | 308 |
Hostname | consumer |
IP | 192.168.0.156 |
RAM | 2 GB |
Disk | 16 GB |
OS | Ubuntu 24.04 |
Установка зависимостей
apt update && apt install -y python3 python3-pip python3-venv git cd /opt mkdir cdc-consumer && cd cdc-consumer python3 -m venv venv source venv/bin/activate pip install confluent-kafka fastavro hdfs requests pyarrow pyhive
Структура проекта
/opt/cdc-consumer/ ├── .env # Переменные окружения ├── venv/ # Python virtual environment └── src/ ├── consumer.py # Основной consumer: Kafka → Landing ├── transform.py # Transform job: Landing → Bronze ├── cdc_merge.py # CDC Merge: Bronze → Silver └── validation.py # Validation: PostgreSQL vs Hive
Код Consumer
consumer.py — основной цикл, который читает сообщения из Kafka, проверяет подпись и записывает в HDFS:
#!/usr/bin/env python3 """ CDC Consumer: Kafka -> HDFS Landing Layer """ import os import json import hmac import hashlib import base64 import logging from datetime import datetime from io import BytesIO from confluent_kafka import Consumer, KafkaError import fastavro from hdfs import InsecureClient # Configuration KAFKA_BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', '192.168.0.153:9092') KAFKA_TOPIC = os.getenv('KAFKA_TOPIC', 'budget.finance.parsed_transactions') KAFKA_GROUP_ID = os.getenv('KAFKA_GROUP_ID', 'cdc-consumer-landing') HDFS_URL = os.getenv('HDFS_URL', 'http://192.168.0.229:9870') HDFS_USER = os.getenv('HDFS_USER', 'hadoop') HDFS_BASE_PATH = '/data/cdc/budget/landing/finance/parsed_transactions' SECRET_KEY = os.getenv('SECRET_KEY') SIGNATURE_MAX_AGE_MS = int(os.getenv('SIGNATURE_MAX_AGE_MS', '300000')) # 5 минут logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__)
Верификация подписи:
def verify_signature(signature_payload: str, timestamp: str, signature: str, secret_key: str) -> bool: """Verify HMAC-SHA256 signature.""" # Проверяем возраст сообщения try: msg_time = int(timestamp) age_ms = int(datetime.now().timestamp() * 1000) - msg_time if age_ms > SIGNATURE_MAX_AGE_MS: logger.warning(f"Message too old: {age_ms}ms > {SIGNATURE_MAX_AGE_MS}ms") return False except ValueError: return False # Вычисляем ожидаемую подпись message_to_sign = f"{signature_payload}|{timestamp}" expected = hmac.new( secret_key.encode('utf-8'), message_to_sign.encode('utf-8'), hashlib.sha256 ).digest() expected_b64 = base64.b64encode(expected).decode('utf-8') # Сравниваем безопасно (constant-time comparison) if not hmac.compare_digest(expected_b64, signature): logger.warning("Invalid signature") return False logger.info(f"Signature verified: {signature_payload}") return True
Важный момент — используем hmac.compare_digest() вместо обычного сравнения строк. Это защита от timing attack, когда атакующий может угадать подпись побайтово, измеряя время ответа.
Парсинг Debezium envelope:
def parse_debezium_message(msg_value: dict) -> dict: """Extract fields from Debezium envelope.""" after = msg_value.get('after') or {} source = msg_value.get('source') or {} return { 'id': after.get('id'), 'transaction_date': after.get('transaction_date'), 'merchant': after.get('merchant'), 'amount': str(after.get('amount')) if after.get('amount') else None, 'transaction_type': after.get('transaction_type'), 'category': after.get('category'), 'is_verified': after.get('is_verified', False), 'parsed_at': after.get('parsed_at'), '_op': msg_value.get('op'), '_ts_ms': msg_value.get('ts_ms'), '_source_lsn': source.get('lsn'), }
Запись в HDFS:
def write_to_hdfs(client, records: list, batch_id: str): """Write batch of records as Avro file to HDFS.""" if not records: return now = datetime.now() partition_path = (f"{HDFS_BASE_PATH}" f"/year={now.year}" f"/month={now.month:02d}" f"/day={now.day:02d}") filename = f"cdc_{batch_id}.avro" full_path = f"{partition_path}/{filename}" buffer = BytesIO() fastavro.writer(buffer, AVRO_SCHEMA, records) buffer.seek(0) client.makedirs(partition_path) client.write(full_path, buffer, overwrite=True) logger.info(f"Written {len(records)} records to {full_path}")
Данные партицируются по дате. Это стандартный паттерн для аналитических хранилищ: позволяет эффективно читать только нужный временной диапазон.
Запуск как systemd-сервис
Создаём файл /etc/systemd/system/cdc-consumer.service:
[Unit] Description=CDC Consumer - Kafka to HDFS Landing After=network.target [Service] Type=simple User=root WorkingDirectory=/opt/cdc-consumer EnvironmentFile=/opt/cdc-consumer/.env ExecStart=/opt/cdc-consumer/venv/bin/python src/consumer.py Restart=always RestartSec=10 [Install] WantedBy=multi-user.target systemctl daemon-reload systemctl enable cdc-consumer systemctl start cdc-consumer
Часть 3: Transform и CDC Merge
Transform: Landing → Bronze
Раз в час cron запускает скрипт, который читает Avro из Landing и конвертирует в Parquet:
def transform_date(client, year: int, month: int, day: int): """Transform all Avro files for a specific date to Parquet.""" landing_path = f"{LANDING_BASE}/year={year}/month={month:02d}/day={day:02d}" bronze_path = f"{BRONZE_BASE}/year={year}/month={month:02d}/day={day:02d}" # Читаем все Avro файлы all_records = [] for avro_file in list_avro_files(client, landing_path): records = read_avro_from_hdfs(client, f"{landing_path}/{avro_file}") all_records.extend(records) if not all_records: return 0 # Конвертируем в PyArrow Table и записываем Parquet table = records_to_arrow_table(all_records) write_parquet_to_hdfs(client, table, bronze_path) return len(all_records)
CDC Merge: Bronze → Silver
Главная логика — применение CDC-операций. Bronze содержит историю изменений, Silver — текущее состояние:
def apply_cdc_operations(current_records: dict, cdc_events: list) -> dict: """Apply CDC operations to current state.""" for event in sorted(cdc_events, key=lambda x: x['_ts_ms']): record_id = event['id'] op = event['_op'] if op == 'c': # CREATE (insert) current_records[record_id] = event elif op == 'u': # UPDATE current_records[record_id] = event elif op == 'd': # DELETE current_records.pop(record_id, None) elif op == 'r': # READ (snapshot) if record_id not in current_records: current_records[record_id] = event return current_records
Сортировка по tsms важна: события должны применяться в правильном порядке. Если UPDATE придёт раньше INSERT, результат будет некорректным.
Часть 4: Validation
Validation сравнивает данные в PostgreSQL (источник) и Hive (приёмник). Это финальная проверка, что весь пайплайн работает корректно:
def main(): logger.info("Starting Validation job") # Step 1: Get records from PostgreSQL pg_records = get_postgres_records() # Step 2: Get records from Hive hive_records = get_hive_records() # Step 3: Compare and report results = compare_records(pg_records, hive_records) print_report(results, len(pg_records), len(hive_records))

Часть 5: Автоматизация
Cron-расписание
# Transform: Landing -> Bronze (каждый час) 0 * * * * cd /opt/cdc-consumer && source venv/bin/activate && python src/transform.py >> /var/log/cdc-transform.log 2>&1 # CDC Merge: Bronze -> Silver (через 15 минут после transform) 15 * * * * cd /opt/cdc-consumer && source venv/bin/activate && python src/cdc_merge.py >> /var/log/cdc-merge.log 2>&1 # Validation: PostgreSQL vs Hive (каждые 6 часов) 30 */6 * * * cd /opt/cdc-consumer && source venv/bin/activate && python src/validation.py >> /var/log/cdc-validation.log 2>&1
Consumer работает постоянно как systemd-сервис и записывает данные в Landing по мере поступления. Периодические задачи (Transform, CDC Merge, Validation) запускаются по cron.
Грабли, на которые можно наступить
1. Подпись не сходится
Симптом: Consumer отклоняет все сообщения с Invalid signature.
Причина: Java record.value().toString() отдаёт Struct-представление, а не JSON.
Решение: Подписывать не весь payload, а только стабильные поля (op, ts_ms, lsn, id).
2. Hive падает с MapReduce error
Симптом: pyhive выбрасывает исключение при SELECT.
Причина: Hive пытается запустить MapReduce, но YARN не настроен.
Решение: Добавить в /opt/hive/conf/hive-site.xml:
<property> <name>hive.execution.engine</name> <value>mr</value> </property> <property> <name>mapreduce.framework.name</name> <value>local</value> </property>
3. WebHDFS Connection refused
Симптом: Consumer не может записать файлы в HDFS.
Причина: Consumer не может разрезолвить hostname hadoop.
Решение: Добавить в /etc/hosts на контейнере consumer:
192.168.0.229 hadoop
4. Message too old
Симптом: Consumer отклоняет сообщения с Message too old: XXXms > 300000ms.
Причина: Старые сообщения в Kafka, которые были отправлены до включения SMT.
Решение: Для первоначальной загрузки увеличить SIGNATURE_MAX_AGE_MS. В production держать строже.
Итоговая архитектура

Все компоненты пайплайна с IP-адресами и потреблением ресурсов
Контейнер | IP | RAM | Назначение |
budget-parser | 192.168.0.150 | 1 GB | Telegram-бот |
pg-source | 192.168.0.151 | 2 GB | PostgreSQL |
kafka | 192.168.0.153 | 4 GB | Kafka + ZooKeeper |
debezium | 192.168.0.154 | 2 GB | Kafka Connect + SMT |
hue | 192.168.0.155 | 1 GB | Веб-интерфейс |
consumer | 192.168.0.156 | 2 GB | Consumer + jobs |
kafka-ui | 192.168.0.160 | 1 GB | Мониторинг Kafka |
hadoop (VM) | 192.168.0.229 | 8 GB | HDFS + Hive |
Итого: ~21 GB RAM на весь CDC-пайплайн.
Полезные команды
# Статус Consumer systemctl status cdc-consumer journalctl -u cdc-consumer -f # Ручной запуск jobs cd /opt/cdc-consumer && source venv/bin/activate python src/transform.py python src/cdc_merge.py python src/validation.py # Проверка данных в HDFS hdfs dfs -ls -R /data/cdc/budget/ # Запрос в Hive beeline -u "jdbc:hive2://localhost:10000" -n hadoop \ -e "SELECT id, merchant, amount FROM cdc_budget.parsed_transactions LIMIT 10;"
Что дальше
Данные теперь текут от скриншота до Hive с криптографической верификацией на каждом этапе:
✅ Скриншот → Claude Vision → PostgreSQL
✅ PostgreSQL → Debezium → подпись (SMT) → Kafka
✅ Kafka → Consumer → проверка подписи → HDFS Landing
✅ Landing → Transform → Bronze
✅ Bronze → CDC Merge → Silver
✅ Silver → Hive → SQL-запросы
✅ Validation: PostgreSQL ↔ Hive
В следующей статье — мониторинг: Prometheus, Grafana, алерты на лаг репликации и падение Consumer.
# | Статья | Статус |
1 | ✅ | |
2 | ✅ | |
3 | ✅ | |
4 | ✅ | |
5 | ✅ | |
6 | Consumer + Transform + Validation | ✅ Эта статья |
7 | Мониторинг в Grafana | 📋 Планируется |
Ссылки
Вопросы и замечания — в комментарии. Особенно интересен опыт тех, кто реализовывал подпись CDC-сообщений в production: какой алгоритм выбрали, как храните секреты?
