
Всем привет, меня зовут Игорь Горбенко, и я системный аналитик в компании М2.
Отчёты, которые обновляются раз в сутки, хорошо подходят для стратегической аналитики. Но в какой-то момент бизнесу становится важно понимать, что происходит в течение дня, а не только по итогам ночной загрузки.
В М2 мы столкнулись с этим, когда от продуктовых команд и службы поддержки начали приходить запросы на внутридневную отчётность и почти real-time метрики. Наш основной подход — ежедневная батчевая загрузка данных — перестал закрывать такие сценарии, и нам понадобился другой способ работы с изменениями в продуктовых базах.
В этой статье я расскажу, как мы внедряли Change Data Capture (CDC) с использованием Apache Flink, какие задачи это помогло решить, с какими ограничениями мы столкнулись и почему CDC — полезный, но не универсальный инструмент.
CDC и Apache Flink: кратко о технологии и нашем подходе
Давайте начнем разбираться. Некоторые из вас наверняка знакомы с понятием CDC, Change Data Capture — техника захвата изменений в базах данных.
Для контекста стоит отметить Apache Flink — движок для загрузки и обработки батчей и стриминговых данных в реальном времени. В статье речь пойдет про Flink CDC — фреймворк с открытым исходным кодом для отслеживания изменений данных в базах данных в реальном времени.
В проектах нашего отдела в М2 основной метод загрузки — это ежедневное ночное
копирование продуктовых баз данных (PostgreSQL, MongoDB) в аналитическое хранилище на базе Apache Iceberg и последующая их обработка с помощью движка Trino.
И вот из разных продуктовых команд к нам стали приходить задачи — построить отчётность с обновлениями втечение дня (в том числе проект для службы поддержки). Поэтому у нас возникла необходимость получать и обрабатывать данные в около real-time режиме. Для реализации этого функционала наши инженеры внедрили новый вариант загрузки данных — CDC с использованием Apache Flink.
Flink CDC непрерывно отслеживает изменения в базе данных и преобразует их в поток событий, которые описывают операции вставки, обновления и удаления строк, которые мы потом преобразуем в аналитический вид и загружаем с помощью DBT — фреймворка для преобразования и загрузки данных.
Зачем нам понадобился CDC: реальные сценарии использования
Сценарий 1: Внутридневные потребители
Когда строятся аналитические хранилища данных, то конечный пользователь, как правило, — C-level сотрудник, который будет долго и внимательно смотреть на графики, а затем принимать важные для компании решения. Однако данные так же нужны и для сиюминутных решений: например, для мониторинга производительности и доступности сервисов или распределения нагрузки между работниками службы поддержки пользователей. Последняя задача была решена нами в этом году.
Например в сервисе поддержки HelpDesk менеджеры могут анализировать KPI сотрудников в течении дня.
Сценарий 2: Внутридневные метрики
Довольно часто для аналитиков важны изменения статусов объекта, чтобы считать метрики скорости перехода между определёнными этапами оказания услуги.
Соответственно, при одноразовой загрузке есть риск пропустить важные статусы. Поэтому важно отслеживать эти изменения в реальном времени или, по крайней мере, с высокой частотой.
Конкретно в нашем случае это были метрики скорости обработки обращения в техподдержку.
Когда оператор техподдержки освобождается, ему поступает обращение — он берет его на себя и далее закрывает тикет, когда его помощь завершена.
Нам было интересно, как долго тикет находится в статусе ожидания оператора и как быстро оператор может помочь клиенту.
Сценарий 3: Большие источники
Наш текущий метод загрузки данных хорошо подходит для небольших таблиц и коллекций (до 50 GB). И благодаря микросервисной архитектуре, большинство продуктов управляют маленькой базой данных, и по этой причине таблицы действительно загружаются быстро.
Однако у нас есть и крупные базы данных (100 GB и более), разбор которых занимает несколько часов. CDC позволяет кратно уменьшить скорость сборки моделей, за счёт того, что обрабатываются только данные за последний день, которых очевидно кратно меньше, чем данных за всё время.
В итоге, благодаря внедрению CDC мы смогли в несколько раз ускорить обработку данных из ключевых продуктов компании и тем самым снизить нагрузку на наше хранилище.
А теперь о минусах

Я рассказал о сценариях использования технологии захвата данных, однако нужно понимать, что CDC — это не панацея, это инструмент для выполнения определённых задач. У этой технологии есть свои преимущества и недостатки.
CDC позволяет нам отслеживать все изменения в базе данных и гораздо быстрее собирать таблицы в Trino, по сравнению с таблицами, данные для которых сгружаются раз в день.
Однако использовать CDC гораздо труднее, из-за того, что вычисления на событиях требуют использования более сложного функционала DBT, а так же более глубокого понимания работы Trino. Это приводит к росту сроков выполнения задач, а также увеличению частоты ошибок, поэтому аналитикам необходимо тщательно следить за качеством получаемых данных.
Дополнительная сложность — в обработке удалений записей в таблицах. Правда в нашем конкретном случае удалось обойтись без неё, так как продуктовые базы данных вместо физического удаления записи из БД просто помечают её как удалённую, то есть формально происходит не операция delete, а операция update.
Ключевые риски использования CDC:
— Риск потери целостности данных из-за пропущенного промежуточного состояния. Если система упадёт, то будет невозможно восстановить статусы записей, которые обновлялись пока система была неактивна
— Риски увеличения сроков разработки
— Риски увеличения вероятности ошибок, которые приходят вместе с повышенной сложностью скриптов
Также стоит следить за оптимизацией скриптов, так как неправильный подход может снизить скорость обновления таблиц, что может привести к нарушению SLA. Данные CDC очень уязвимы к потерям, а ошибки планирования времени работы скриптов сборки или нарушение их порядка могут вести к серьёзным проблемам в отчётности, которые потом будет трудно найти и устранить.
Как это работает на практике: витрина для службы поддержки
Рассмотрим работу CDC на примере дата-продукта, который уже готов и активно используется — а именно витрины для службы поддержки М2.
В самом начале мы делаем снепшот источника, то есть загружаем текущее состояние БД, от которого дальше будут считаться изменения. Далее к нам постоянно в режиме реального времени поступают данные об изменения в БД, которые мы обрабатываем с периодичностью раз в час.
Обрабатывать эти события нам помогают встроенные шаблоны DBT: тип материализации incremental и стратегия delete+insert. Также, из инструментов DBT нам пригодилась функция is_incremental(), которая используется для разделения первого и последующих запусков модели. Это необходимо, потому что в первый раз нужно обработать все данные, а в последующие разы нужно загрузить только данные за последний день.
Иногда, чтобы обновить запись в таблице, нужны данные из этой же таблицы. Но если модель собирается впервые, то таблицы физически ещё нет, что вызовет ошибку. Для таких случаев также используется макрос is_incremental().
-- Пример использования DBT для обработки новых данных
with
metric_source as (
select *, event_time as valid_from
from {{ ref("stg__mongo_cdc__helpdesk_ticket__metric") }}
where
{%- if is_incremental() %}
date(processing_time) = date('{{ var("ingestion_date") }}')
{%- else %}
processing_time > (
select
min(processing_time) filter (where upper(is_snapshot) = 'TRUE')
from {{ ref("stg__mongo_cdc__helpdesk_ticket__metric") }}
) -- Время создания снепшота ticket
{%- endif %}
),По факту мы берём данные за весь прошедший день и оставляем только новые по уникальному ключу события. Имейте в виду, что за уникальным ключом надо внимательно следить. Идентификатор события есть только у событий, но нет у снепшотов. На ранних этапах разработки модели это приводило к созданию дубликатов записей.

На следующем этапе из новых и старых событий мы рассчитываем метрики: время ожидания, время ответа и прочее. Для каждой метрики мы высчитываем временной интервал, в течение которого она является актуальной, чтобы при желании можно было посмотреть на состояние тикетов за предыдущие периоды. Затем по каждой метрике мы берём актуальное состояние и выводим его в основную витрину, которая далее используется для построения дашборда.
На каждом новом этапе мы используем всё меньше данных, что позволяет нам быстро собирать таблицы и всегда укладываться в срок сборки.
В качестве архитектуры нашего Data Lake используем что-то похожее на DataBricks Medallion. Это разделение данных на три слоя: bronze, silver и golden. Сначала хранение необработанных данных, потом очистка и преобразование данных, а затем агрегация для конкретных бизнес-задач. Мы также используем промежуточный слой copper между bronze и silver для разложения JSON-документов в реляционные таблицы.

Заключение
В общем, CDC — это мощный инструмент, однако его использование оправдано только в тех случаях, когда преимущества перевешивают сложность реализации и поддержки. Собственно в наши планы на будущее входит создание чёткого процесса работы с CDC и разработка шаблонов для написания скриптов моделей, что даст возможность всем сотрудникам департамента аналитики строить модели без необходимости глубокого погружения в логику сбора данных.
Если у вас возникли какие‑то вопросы, мы будем рады ответить на них в комментариях!
