Привет, Хаброжители!

От выбора подхода к поглощению данных может зависеть успех или провал работы конвейера данных

Получив опыт создания конвейеров данных, которые ежедневно обрабатывают сотни миллионов записей, я понял, что именно на уровне поглощения данных решается успех или провал большинства проектов в области инженерии данных. Если здесь допустить ошибку, то придется месяцами бороться с проблемами, возникающими с производительностью, качеством данных и недовольством заинтересованных сторон. Если все сделать правильно, то ваш конвейер станет надежной основой для принятия важных бизнес-решений.

Сегодня я разберу три основных шаблона поглощения — пакетный, потоковый и захват изменений данных (CDC) — на материале реальных примеров, фрагментов кода и схем принятия ��ешений, чтобы помочь вам сделать правильный выбор.

Решение о поглощении данных: дело не в технологии, а в требованиях

Прежде чем перейти к рассмотрению паттернов, давайте развеем распространенное заблуждение: выбор между пакетной обработкой, потоковой обработкой и CDC заключается не в том, что «лучше», а в том, что решает вашу конкретную проблему.

Я видел, как разрабатываются сложные потоковые конвейеры для данных, которые обновляются раз в день, а другие команды боролись с пакетной обработкой, когда пользователям требовалась информация практически в режиме реального времени. Я также видел, как команды выполняли полное извлечение таблиц, когда CDC было бы в 100 раз эффективнее. Все эти ошибки могут дорого обходиться.

Паттерн 1: Пакетная обработка – надежный рабочий инструмент 📦

Что такое пакетная обработка?

Пакетная обработка собирает и обрабатывает данные через определенные промежутки времени — ежечасно, ежедневно, еженедельно или по запросу. Представьте себе грузовик, который забирает посылки в установленное время и доставляет их все сразу. Когда использовать пакетную обработку

Идеально для:

  • Загрузки и заполнения исторических данных

  • Отчетности и аналитики с требованиями к ежедневной/ежечасной актуальности

  • Рабочих нагрузок, чувствительных к стоимости (пакетная обработка обычно в 5–10 раз дешевле потоковой)

  • Источников данных, которые обновляются периодически (ежедневные отчеты о продажах, ежемесячные финансовые данные)

  • Сценариев с высокой пропускной способностью, где допустимы небольшие задержки

    Не подходит для:

  • Дашбордов, работающих в режиме реального времени (требуется задержка менее одной минуты)

  • Обнаружения мошенничества или оповещения о безопасности

  • Мониторинга датчиков IoT

  • Отслеживания поведения пользователей в режиме реального времени

Ключевые особенности пакетной обработки

1.      Срабатывания по времени: запуск по расписанию (cron-выражения, Airflow DAGs)

2.      Высокая пропускная способность: возможность эффективной обработки миллионов записей за один раз

3.      Простое восстановление: неудачные пакеты можно легко повторить или переработать

4.      Предсказуемые затраты: ресурсы запускаются, обрабатывают данные и выключаются

Пример из реальной жизни: ежедневный анализ заказов в электронной коммерции

Допустим, вы создаете аналитический конвейер для платформы электронной коммерции. Заинтересованные стороны нуждаются в ежедневных отчетах о заказах, доходах и поведении клиентов, но им не нужны обновления в режиме реального времени.

Архитектура:

Лучшие практики пакетной обработки

1.      Идемпотентность критически важна: ваши batch-задачи должны давать один и тот же результат при повторном запуске с теми же входными данными.

2.      Внедрение контрольных точек и восстановления

3.      Важность стратегии разбиения на разделы

4.      Мониторинг производительности пакетов


Паттерн 2: Потоковая обработка — движок реального времени

Когда использовать потоковую обработку?

Потоковая обработка непрерывно обрабатывает данные по мере их поступления с минимальной задержкой. Представьте себе конвейерную ленту, по которой данные постоянно движутся и сразу же обрабатываются.

Когда использовать потоковую обработку

Идеально для:

  • Дашбордов и мониторинга в реальном времени

  • Обнаружения мошенничества и предупреждений о безопасности

  • Данных датчиков IoT и телеметрии устройств

  • Отслеживания активности пользователей и персонализации

  • Архитектур, управляемых событиями

  • Захвата изменений данных (CDC) из баз данных

     ❌ Не подходит для:

  • Загрузки больших объемов исторических данных (используйте пакетную обработку для заполнения)

  • Проектов с ограниченным бюджетом и нестрогими требованиями к задержке

  • Источников данных, которые обновляются нечасто

Ключевые особенности потоковой обработки

1.      Событийно-ориентированный подход: обработка данных сразу после их поступления

2.      Низкая задержка: время обработки от долей секунды до долей минуты

3.      Отказоустойчивость: встроенная система контрольных точек и семантика «точно один раз»

4.      Требует обработки с учетом состояния: управление окнами, агрегациями и дедупликацией

Пример из реальной жизни: потоковая передача телеметрических данных с устройств IoT

Представьте, что вы обрабатываете телеметрические данные с миллионов устройств IoT, которые каждые 30 секунд отправляют показатели работоспособности. Вам необходимо обнаруживать аномалии, запускать оповещения и обновлять панели мониторинга в режиме реального времени.

Архитектура:

Лучшие практики потоковой обработки

1. Всегда используйте контрольные точки

Контрольная точка обеспечивает однократную обработку и восстановление после сбоев:

# Хорошо: указано местоположение контрольной точки
query = stream.writeStream \
    .option("checkpointLocation", "s3://checkpoints/my-stream/") \
    .start()
# Плохо: нет контрольной точки — при перезапуске будет потеряно состояние
query = stream.writeStream.start()  # ❌ Не делайте так!

2. Внедрение водяных знаков для запоздалых данных

Водяные знаки помогают управлять запоздалыми событиями и предотвращают неограниченный рост состояния:

# Обработка событий, поступающих с опозданием до 10 минут
stream_with_watermark = stream \
    .withWatermark("event_timestamp", "10 minutes")
# Агрегации с водяными знаками автоматически удаляют старое состояние
aggregated = stream_with_watermark \
    .groupBy(window("event_timestamp", "5 minutes"), "device_id") \
    .agg(avg("metric").alias("avg_metric"))

3. Выберите правильный интервал запуска

# Микропакет (наиболее распространенный): обработка каждые 30 секунд
.trigger(processingTime="30 seconds")
# Непрерывные (низкая задержка): задержка ~1 мс, но ограниченные операции
.trigger(continuous="1 second")# Доступно сейчас: однократная обработка доступных данных (как пакетная)
.trigger(availableNow=True)# По умолчанию: обработка как можно быстрее (не рекомендуется для продуктива)
.trigger()

4. Мониторинг работоспособности потока

5. Работа со эволюцией схемы


Паттерн 3: Захват изменений данных (CDC) — интеллектуальная синхронизация 🔄

Что такое CDC?

Захват изменений данных (CDC) фиксирует только изменения (вставки, обновления, удаления) из исходных систем, а не извлекает целые наборы данных. Это похоже на получение сравнения изменений, а не на повторное чтение всей книги.

Когда использовать CDC

  ✅ Идеально для:

  • Репликации баз данных в режиме, близком к реальному времени.

  • Минимизации нагрузки на исходные системы (особенно на продуктивные базы данных).

  • Отслеживания исторических изменений и ведения контрольных журналов

  • Синхронизации данных между операционными и аналитическими системами

  • Архитектуры с событийным источником данных

  • Сокращения затрат на передачу данных (перемещение только измененных данных)

     ❌ Не подходит для:

  • Первоначальной загрузки данных (используйте пакетную обработку для заполнения, затем переключитесь на CDC)

  • Источников без возможности отслеживания изменений

  • Простых файловых источников данных

  • Систем, в которых полное обновление дешевле, чем инкрементальное наполнение

Ключевые особенности CDC

1.      Получение данных на основе журналов: считывает журналы транзакций базы данных (binlog, WAL, redo logs)

2.      Инкрементальный характер: обрабатывает только изменения, а не полные таблицы

3.      Сохраняет историю исходной системы: фиксирует тип операции (INSERT/UPDATE/DELETE)

Низкое воздействие на исходную систему: минимальная нагрузка на производительность продуктивных баз данных

Как работает CDC: три подхода

1. CDC на основе журналов (наиболее эффективный) 📝

Прямое чтение журналов транзакций базы данных без запроса таблиц:

# Пример: Debezium читает бинарный журнал MySQL
{
  "before": null,  # Данные до изменения (null для INSERT)
  "after": {       # Данные после изменения
    "order_id": 12345,
    "customer_id": 789,
    "order_total": 149.99,
    "status": "completed"
  },
  "op": "c",      # Операция: c=создание, u=обновление, d=удаление
  "ts_ms": 1704067200000,
  "source": {
    "db": "ecommerce",
    "table": "orders"
  }
}

Инстркменты: Debezium, AWS DMS, Oracle GoldenGate, Maxwell, Striim

Плюсы:

Плюсы:

  • Отсутствие нагрузки на исходную базу данных

  • Захват всех изменений в режиме реального времени

  • Отсутствие необходимости в изменении схемы

    Минусы:

  • Требуется доступ к журналу базы данных и его хранение

  • Сложная настройка и мониторинг

  • Различные форматы журналов для разных баз данных

    2. CDC на основе триггеров 🎯

    Триггеры базы данных записывают изменения в отдельные таблицы отслеживания т:

    Плюсы:

  • Проще в реализации

  • Работает с любой базой данных, поддерживающей триггеры

  • Возможна настройка кастомной логики

    Минусы:

  • Влияние на производительность исходной базы данных

  • Накладные расходы на обслуживание триггеров

  • Возможность сбоев тригерров

3. CDC на основе запросов (отслеживание временных меток/версий) 🕒

Отслеживает изменения с помощью столбцов временных меток или версий:

# Отслеживание последней обработанной временной метки
LAST_WATERMARK = "2024-01-15 10:00:00"
# З
query = f"""
    SELECT *
    FROM orders
    WHERE updated_at > '{LAST_WATERMARK}'
    ORDER BY updated_at
"""# После обработки обновляет водяной знак
NEW_WATERMARK = max(df['updated_at'])

Плюсы:

  • Простота реализации

  • Не требует специальных функций базы данных

  • Работает с большинством источников данных

    Минусы:

  • Не может распознавать удаления (если не существует столбец soft-delete)

  • Требует наличия столбца updated_at во всех таблицах

  • Проблемы с разницей во времени в распределенных системах

  • Пропускает изменения, если updated_at не изменяется

Пример из реальной жизни: синхронизация заказов в электронной коммерции с помощью CDC

Сценарий: синхронизация данных о заказах из продуктивной базы PostgreSQL с аналитическим озером данных с минимальной задержкой и минимальным воздействием на исходную систему.

Архитектура:

Лучшие практики CDC

1. Работа с эволюцией схемы: системы CDC должны корректно обрабатывать изменения исходной схемы:

2. Обработка эволюции схемы: системы CDC должны корректно обрабатывать изменения исходной схемы:

3. Мониторинг задержки CDC: отслеживание отставания конвейера CDC от источника:

4. Обработка начальной загрузки + переход на CDC: начало с пакетной загрузки, затем переход на CDC:

5. Идемпотентная обработка CDC: обеспечение того, что повторная обработка одних и тех же событий CDC дает одинаковые результаты

Размышления о производительности CDC

Оптимизация исходной базы данных:

-- Для CDC на основе журналов убедитесь, что срок хранения журналов достаточен
-- MySQL
SET GLOBAL binlog_expire_logs_seconds = 604800;  -- 7 days
-- PostgreSQL
ALTER SYSTEM SET wal_keep_size = '10GB';

Пакетные события CDC:

# Обрабатывайте события CDC микропакетами для повышения эффективности
.trigger(processingTime="30 seconds")  # Instead of continuous
.option("maxOffsetsPerTrigger", 10000)  # Limit events per batch

Дедупликация событий CDC:

# Обработка дубликатов сообщений CDC (доставка «хотя бы один раз»)
deduplicated_cdc = parsed_cdc \
    .withWatermark("change_timestamp", "5 minutes") \
    .dropDuplicates(["order_id", "change_timestamp", "operation"])

Пакетная обработка, потоковая обработка и CDC: матрица принятия решений

Вот практичная схема для выбора между пакетной обработкой, потоковой обработкой и CDC:

Фактор

Batch

Streaming

CDC

Требование к задержке (latency)

Часы — дни

Секунды — минуты

Минуты — секунды

Объём данных

Высокий (терабайты за пакет)

Непрерывный поток (гигабайты в час)

Только изменения (мегабайты в час)

Нагрузка на источник данных

Высокая (полные сканирования)

Н/Д (источник — события)

Низкая (чтение логов)

Стоимость

Ниже (в 5–10 раз дешевле)

Выше (постоянно работающая инфраструктура)

Средняя (обработка логов)

Сложность

Проще (без состояния)

Сложно (со состоянием)

Умеренная (парсинг логов)

Типовые сценарии

Отчётность, обучение ML

Мониторинг, алерты

Синхронизация БД, аудит

Актуальность данных

По расписанию

Почти в реальном времени

Почти в реальном времени

Обработка удалений

Да (полное обновление)

Зависит от источника

Да (если на основе логов)

Первичная загрузка

Нативно поддерживается

Требуется добавление данных

Требуется сначала пакетная обработка

Дерево принятия решения

Гибридный паттерн: лучшее из двух миров

На практике в большинстве продуктивных систем используются несколько паттернов одновременно:

Паттерн 1: Пакетная обработка + CDC (Наиболее распространенный)

Начальная загрузка: пакетная обработка (исторические данные, 2+ года)

Постоянная синхронизация: CDC (только ежедневные изменения)

Аналитика: беспрепятственный запрос обоих через Delta Lake

Пример использования: синхронизация продуктивной базы данных с хранилищем данных

  • День 1: пакетная загрузка 500 ГБ исторических заказов

  • День 2+: CDC фиксирует только измененные записи (~500 МБ/день)

  • Результат: в 1000 раз меньший объем перемещаемых данных, минимальное воздействие на исходную систему

Паттерн 2: CDC + Потоковые агрегации

База данных → CDC → Kafka → Потоковые агрегации → Дашборды в реальном времени

Bronze/Silver/Gold Delta Tables

Пример использования: аналитика заказов в реальном времени

Пакетный уровень:   Исторические агрегации (ежедневно)
Потоковый уровень:  Обновления в реальном времени (последние 24 часа)
Уровень обслуживания: Объединение обоих представлений для запросов

Паттерн 3: Пакетная обработка + Потоковая обработка (Архитектура Lambda)

Batch Layer: Исторические агрегаты (ежедневные)
Stream Layer: Потоковый слой: Реальные обновления (за последние 24 часа)
Serving Layer: Объединение обоих представлений для запросов

Пример использования: рекомендательные движки

  • Пакетный: Обучение моделей на основе исторического поведения пользователей (еженедельно)

  • Потоковый: Захват кликов в реальном времени для немедленной персонализации

  • Обслуживание: Объединение пакетных прогнозов + сигналов в реальном времени

    Вывод: выбирайте с умом, внедряйте грамотно

    Выбор между пакетной, потоковой и CDC-обработкой не является бинарным — это спектр, основанный на ваших конкретных требованиях:

  • Если вы не уверены, начните с пакетной обработки. Она проще, дешевле и решает 80 % случаев использования.

  • Добавьте CDC при инкрементной синхронизации баз данных — это в 10–100 раз эффективнее пакетного ввода для рабочих нагрузок с большим количеством изменений.

  • Переходите к потоковой обработке только в том случае, если требования к задержке менее одной минуты оправдывают дополнительную сложность и затраты.

  • Используйте гибридные подходы для комплексных платформ данных, которым требуется историческая полнота и аналитика в реальном времени.

Ключевые выводы

Пакетная обработка: запланированная, высокопроизводительная, экономичная — идеально подходит для аналитики и отчетности
Потоковая обработка: непрерывная, с низкой задержкой, сложная — необходима для мониторинга в реальном времени
CDC-обработка: инкрементная, эффективная, практически в реальном времени — идеально подходит для синхронизации баз данных
Гибридные паттерны: сочетание всех трех для комплексных платформ данных. (наиболее распространенные в практике)
Всегда внедряйте: идемпотентность, контроль, мониторинг и проверку качества данных
Оптимизируйте под свои конкретные требования к задержке, стоимости и сложности.

Справочник по выбору паттернов

1. Синхронизация базы данных объемом 100 ГБ, которая изменяется на 1 % ежедневно → CDC (на основе журналов)

2.  Ежедневный отчет о продажах, допустимая задержка 24 часа → Пакетная обработка (по расписанию)

3. Данные датчиков IoT, требуются оповещения в режиме реального времени → Потоковая обработка (Kafka)

4. Финансовые транзакции, требуется аудиторский след → CDC (сохранение истории)

5.  Большой исторический набор данных для обучения ML → Пакетная обработка (однократная загрузка)

6.  Поток кликов пользователей для персонализации в реальном времени → Потоковая обработка (основанная на событиях)

7.  Репликация базы данных с несколькими таблицами → CDC (Debezium + Kafka)

8.  Данные API, обновляемые каждые 6 часов → Пакетная обработка (опрос API)

Помните: лучший паттерн поглощения — это тот, который соответствует вашим требованиям без излишней инженерной проработки. Я видел, как команды тратили месяцы на создание потоковых CDC-конвейеров для данных, которые обновляются раз в неделю, а другие боролись с ежедневными пакетными извлечениями, когда CDC могло бы сэкономить 95% времени и затрат на обработку.

 Начните с простого, измеряйте все и развивайте свою архитектуру по мере изменения требований.

Каков ваш опыт в использовании этих моделей поглощения? Сталкивались ли вы с трудностями при внедрении какой-либо из них? Поделитесь своими мыслями в комментариях!