
Наполнение данными хранилища или озера, как правило, является первым большим шагом к доступности аналитической среды для основного функционала и работы конечных пользователей. От эффективной реализации этой задачи зависят стоимость и длительность всего проекта по созданию хранилища данных и сроки предоставления отдельных data-сервисов.
В этой публикации я поделюсь опытом реализации пакетной загрузки больших данных в аналитические хранилища и расскажу, когда следует выбрать именно пакетную загрузку, а когда – онлайн-подход. Отдельно раскрою, как многолетний опыт решения подобных задач был воплощен в промышленном инструменте репликации данных Data Ocean Flex Loader.
Что выбрать: Batch или Real-time?
За годы проектирования решений для интеграции и загрузки я выработал для себя следующий принцип: если пакетная загрузка данных реализуема – стоит выбрать ее и не гнаться за минимальным отставанием без реальной на то необходимости. Прибегать к real-time репликации стоит только в тех случаях, когда это оправдано с точки зрения затрат и уровня сервиса.
К требованиям пользователей всегда нужно относиться настороженно. Если спросить их напрямую, почти всегда можно услышать фразу: «Конечно мы хотим, чтобы все данные загружались в наше хранилище в реальном времени». На деле часто это оказывается неоправданной «хотелкой». Никаких рациональных причин, регламентных процессов, отчетов, требующих самых актуальных данных, на самом деле нет и не предвидится в ближайшем будущем. Ну а если где-то и есть такие процессы, то в них участвует небольшой перечень объектов, и требования можно распространить только на них.
Когда стоит рассматривать batch-интеграцию:
Не требуется поддерживать отставание от источника менее, чем в 15 минут;
Система-источник (СИ) может отдавать данные в пакетном режиме без ущерба для своей производительности. Извлечение данных может производиться с основной БД в любое время, либо в выделенное техническое окно, либо имеется Stand by сервер с возможностью чтения;
Скорость извлечения данных из системы-источника в каждой итерации не превышает допустимое отставание;
Требуется загрузка не только из СУБД как источника, но и из интеграционных файловых систем (плоские файлы) или по FTP-протоколу;
Необходима настройка массовой загрузки (базы\схемы целиком без ручной настройки каждого объекта), например, для инициализирующей загрузки или при регулярном копировании схем и баз целиком;
Требуется выполнение базовых проверок качества данных (дубли, null-значения в ключах) как части процесса загрузки;
Требуется режим, который не обеспечивается другими инструментами, или рассматривается сценарий гетерогенной репликации, например, S3<->HDFS, Greenplum<->HDFS, HDFS<->HDFS и так далее.
Противопоказания для режима пакетной репликации и предпосылки использования real-time загрузки и change data capture подхода:
Необходимо поддерживать отставание от источника менее, чем 15 минут;
СИ не способна отдавать данные в пакетном режиме c нужным SLA (ограничение производительности или критичные задачи, на которые влияет извлечение данных в пакетном режиме, отсутствие реплики на чтение и другие причины);
В СУБД отсутствуют механизмы эффективного пакетного экспорта данных (в том числе в файлы);
Необходима инкрементальная выгрузка объекта, но нет возможности выделять логический инкремент;
Присутствуют нежурналируемые физические удаления строк;
СИ поддерживается инструментами класса Change Data Capture (CDC), и есть возможность работать с журналами изменений напрямую. Работа CDC не вызывает дополнительной нагрузки на СУБД источника;
Необходимы преобразования данных в потоке передачи от источника в приемник, например, онлайн-обогащение, look up'ы с условиями;
Требуется загрузка данных в приемники, которые не поддерживают пакетный импорт данных.
Однако нужно помнить, что на стороне приемника могут быть свои ограничения. Приемник может быть не толерантен к онлайн-изменениям или даже простой онлайн-вставке данных, и тогда придется комбинировать оба подхода различными инструментами. Часто даже при выборе в пользу онлайн необходима первоначальная инициирующая пакетная загрузка, либо корректирующая периодическая batch-репликация для устранения расхождений. Это так называемый лямбда-подход в архитектуре, при котором данные загружаются в реальном времени, но регулярно происходит их перезапись за определенный интервал в пакетном режиме. Часто пакетная перезапись используется также для корректировки качества данных.
С теорией разобрались, перейдем к основной части. Далее пойдет рассказ об инструменте пакетной репликации данных Data Ocean Flex Loader. Материал может быть интересен не только тем, кто интересуется инструментом, но и тем, кто самостоятельно разрабатывал или планирует разрабатывать решения для batch-загрузки.
Немного истории
Впервые задачей массовой достоверной гетерогенной репликации из нескольких СУБД в одну целевую без использования специализированных промышленных ETL-инструментов мне пришлось заняться 10 лет назад. Требования заказчиком были следующие:
Синхронизация структуры объектов c трансформацией типов без потери качества;
Перенос схем целиком;
Работа в инкрементальном режиме.
В качестве целевой системы предполагалась Vertica. Пилотный прототип решения был удачным, но проект не состоялся, так как заказчик отказался от Vertica в целевой архитектуре и сменил приоритеты. Идея пролежала в столе пару лет, пока не стали появляться проекты по созданию озер данных на базе экосистемы Hadoop. Тогда то и получилось реализовать все задуманное и проверить идеи на серьезной практике. Некоторые решения из проектов тех лет до сих пор носят терабайты данных каждый день.
За прошедшее с того момента время среди open source проектов и коммерческих продуктов так и не появилось что-то подходящее под данную задачу. Разве что вышел Airbyte и стал доступен в on-prem. Что-то было ориентированно чисто на облачный западный рынок, что-то не подходило под типовые реалии и требования клиента. На российском рынке под видом импортозамещения все начали заниматься банальной сборкой open source с минимальными функциональными изменениями или часто вообще без таковых.
Так возникла идея наконец-то собрать готовое промышленное решение, которое в будущем могло бы стать функциональной частью продуктовой линейки для работы с данными Data Ocean. Data Ocean Flex Loader не является пересборкой или адаптацией какого-либо open source проекта. Это инструмент, спроектированный и разработанный «с нуля», соответствующий обозначенным выше требованиям и основанный на проверенных годами проектной работы решениях.
Самостоятельная разработка или выбор готового решения
Непростой выбор, с которым сталкивается владелец системы или потенциальный заказчик, – стоит ли идти в самостоятельную разработку решения по наполнению хранилища данных или выбрать готовый инструмент. Разумеется, при условии, что готовый инструмент удовлетворяет всем техническим и функциональным требованиям.
Прежде всего стоит руководствоваться сроками разработки подходящего решения, ведь даже для простой реализации загрузки без инкрементального режима, без контроля изменения метаданных и проверок качества данных, потребуется квалифицированная команда и достаточно много времени. Хорошим примером для индикативной оценки может послужить опыт самостоятельной внутренней разработки, усилий и времени, на нее потраченных, представленный в материале Миграция Big Data на практике: как мы готовили напильники.
Data Ocean Flex Loader как готовый инструмент позволяет начать репликацию данных в хранилище сразу после установки. Фактически, как только появляется инфраструктура ХД, вы сразу предоставляете данные для анализа пользователям и сервисам. Свое, самое лучшее, решение, учитывающее все детали ваших уникальных систем-источников, вы можете разрабатывать уже после. Выбор остается только за вами.
Функциональные возможности Data Ocean Flex Loader
При работе в среде Big Data, как правило, придерживаются парадигмы обработки данных в той среде, где они хранятся, – так называемый подход ELT (Extract, Load, Transform). На практике это означает, что инструмент для репликации, реализующий буквы E и L, как раз должен выполнять извлечение данных из источника и загружать их в приемник. Это и должно быть основной базой функционала.
Data Ocean Flex Loader поддерживает следующие режимы извлечения данных из источника:
Полной режим извлечения – Snapshot;
Отбор логического инкремента:
Выгрузка инкремента из источника по любой скалярной детерминированной функции;
Извлечение временного окна с ограничением «снизу» и «сверху»;
Захват диапазона секций из секционированного источника, в том числе с возможностью автоматического определения диапазона или списка конкретных секций самим инструментом;
Выгрузка данных на основе «ручных» условий отбора.
Для каждой итерации выгрузки инструмент формирует запросы на extract на основе последнего успешного сеанса работы с объектом. Дополнительно во Flex Loader реализована возможность запуска Pre-SQL и Post-SQL обработки, что бывает полезно, когда данные для отбора нужно подготовить до извлечения, а после – выполнить «гигиеническую» чистку.
Извлечение данных может осуществляться в многопоточном режиме, при котором на системе-источнике одновременно создается несколько сессий обмена данными, каждая из которых читает свой диапазон. Определение диапазона и количества «поднимаемых» сессий происходит автоматически. Предварительно Data Ocean Flex Loader выполняет сэмплирование источника для оценки общего объема, расчета количества сессий с учетом ресурсов, выделенных движку, которым будет осуществляться выборка данных (то есть если ресурсов Spark, или PXF, или другого инструмента интеграции не хватает для полного захвата, то FL разобьет данные на интервалы по принципу равномерного распределения). Также FL следит, чтобы общее число сессий не превышало ограничение, заданное конфигурацией, с целью исключить ситуацию, когда «за-DDoS-или» систему-источник. Если система-источник распределенная, то инструмент может выполнить балансировку запросов по узлам по принципу round robin. Также для каждого источника можно определить расписание «окон доступности» обмена данными.
В нормальном ландшафте загружаются десятки или даже сотни объектов одновременно. Поэтому при формировании заданий учитываются приоритеты. Ведь ресурсы системы-источника на выгрузку могут быть ограничены, а число объектов и объем извлекаемых данных может многократно их превышать. Да и систем-источников может быть несколько. Flex Loader имеет внутренний планировщик, который формирует очереди заданий в соответствии с расставленными администратором приоритетами между источниками и системами.
Применение данных в целевом хранилище
Data Ocean Flex Loader создавался не просто как инструмент, который может принести данные в ваше хранилище, а как средство быстрого создания первичного слоя хранения или Operational Data Store (ODS), поэтому доставка данных до целевого слоя производится в соответствии с выбранным сценарием.
Доступные сценарии:
Простая вставка insert append only;
SCD Type 1 (SCD1) режим, при котором новые строки вставляются, а изменившиеся – обновляются;
SCD1 c обработкой физических удалений на источнике в диапазоне захваченных данных. FL выполняет сравнение источника и приемника и поиск удаленных строк. Далее в приемнике, в зависимости от настройки, либо удаляет эти строки, либо размечает флагом логического удаления;
Режим сохранения истории изменений SCD4, при котором в приемнике на одну таблицу-источник создается две таблицы:
SCD1, отражающая текущее состояние источника;
HIST-сателлит, где сохраняется вся история изменений.
Режим SCD4 особенно полезен, если система-источник не сохраняет историю изменений, либо периодически ее удаляет на своей стороне. Историю необходимо иметь для ретроспективной работы с данными или восстановления истории в ХД в аналитических слоях и витринах.
Процесс извлечения и применения данных может быть синхронным или асинхронным на случай, если несколько сеансов извлечения хочется «схлопнуть» в один сеанс применения.
Архитектура решения
Data Ocean Flex Loader по своей сути является инструментом типа framework. Его принцип работы состоит в том, чтобы на основе внутренних метаданных, созданных встроенными методами, генерировать исполняемый код во всех системах, участвующих в обмене данными. При этом сгенерированный код отправляется на исполнение другим фреймворкам, СУБД, процессинговым движкам. Решению остается оркестрировать все процессы кодогенерации и контролировать исполнение этих задач. В таком архитектурном подходе самому инструменту требуется совсем немного системных ресурсов. Основную работу выполняют за него другие. Рассмотрим на нескольких примерах, чтобы было понятнее.
Пример 1. «Загрузка данных из реляционной СУБД в Greenplum»
Извлечение данных из системы-источника может осуществляться:
Через фреймворк PXF;
Через фреймворк Spark;
PXF или Spark извлекают данные и доставляют до Greenplum, где данные применяются к целевой таблице-приемнику в соответствии с выбранным сценарием. Инструмент выполняет генерацию исполняемого кода и для извлечения, и для применения данных на SQL-диалекте Greenplum с нужными параметрами и управляет его выполнением;
После применения данных, если необходимо, происходит запуск операции vacuum и собирается статистика.

Spark, который используется как транспорт данных между источником и приемником, может быть запущен в standalone-режиме, в среде YARN Hadoop или в Kubernetes-окружении, например, если вы наполняете данными Lakehouse-платформу Data Ocean Nova.
Пример 2 «Загрузка данных из Teradata в Lakehouse (объектное хранилище S3)»
Извлечение данных происходит через Teradata Parallel Transporter (TPT). FL формирует TPT-задания, управляет их запуском и работой, контролирует запись данных в S3 Storage;
По завершении записи в объектное хранилище FL применяет данные в соответствии с выбранным сценарием обновления на одном из поддерживаемых Lakehouse-системой движков: Spark или Impala. Инструмент выполняет генерацию исполняемого кода на Spark или SQL-диалекте Impala с нужными параметрами и управляет его выполнением;
После применения выполняется сбор статистики над целевыми объектами.
При записи в целевую среду HDFS или S3, помимо традиционных файловых форматов, поддерживается табличный формат Apache Iceberg.
Сам по себе Data Ocean Flex Loader является приложением, работающим в среде контейнеризации Kubernetes, что относит его к классе cloud-ready. В качестве внутреннего хранилища метаданных используется СУБД Postgres.
Поддерживаемые источники
В настоящий момент с инструментом поставляются следующие коннекторы:
Oracle;
MS SQL Server;
Postgres;
SAP IQ;
SAP ASE;
MySQL;
SAP HANA;
Teradata;
Greenplum;
MariaDB;
sFTP.

В большинстве случав добавление нового коннектора для РСУБД или MPP БД не требует больших усилий. Для этого достаточно написать запросы к словарю данных источника в соответствии с требованиями инструмента, добавить конвертацию типов данных, подобрать правильный драйвер. Если извлечение данных из нового источника невозможно реализовать чтением данных стандартным интерфейсом, то необходимо будет добавить модуль генерации кода для использования стороннего фреймворка.
Межкластерная репликация
С помощью межкластерной репликации данных можно не только организовать быстрое перемещение данных между разными ролевыми кластерами системы, но и организовать отказоустойчивое плечо в инфраструктуре, особенно если DR-кластер расположен в отдельном центре обработки данных. При этом в приемнике всегда сохраняется возможность работать с данными на контуре, на который постоянно выполняется накат изменений.
Между объектными хранилищами или Hadoop-системами осуществляется файловый обмен с инвалидацией метаданных метакаталога. Для такого обмена также доступно инкрементальное обновление изменений, при этом если в кластере-источнике имеется множество небольших файлов, то в процессе переноса на резервное плечо они переупакуются до целевого размера. При использовании табличного формата Iceberg инвалидируется iceberg snapshot на стороне приемника.
Для Hadoop-систем, не использующих формат iceberg, но подразумевающих обеспечение требований ACID, во Flex Loader встроен сервис изолирования изменений, основанный на переключении HDFS snapshot’ов. Это гарантирует непрерывность пользовательской транзакции и изолированность изменений на принимающей стороне.
Для межкластерной репликации доступны следующие целевые системы и направления:
Greenplum <-> S3;
Greenplum <-> HDFS;
HDFS <-> S3;
S3 <-> S3;
HDFS <-> HDFS;
Greenplum <-> Greenplum.

Межкластерная репликация может быть организована двумя сценариями. Первый гарантирует доставку данных от системы-источника сразу на два и более независимых контура. Данные извлекаются из СИ, записываются на оба кластера целевой системы, и по завершении записи применяются на оба кластера в соответствии с выбранным сценарием. Такой подход гарантирует зафиксированное консистентное состояние первичных данных источника в случае переключения с промышленного кластера на резервный. Система-источник при этом опрашивается всегда один раз.

Второй режим предполагает копирование производных данных между кластерами. Копирование может быть как по расписанию, так и через вызов API. На практике это работает так: ETL-инструмент обновил данные расчетной витрины и выполнил вызов API на копирование изменений целевого объекта на резервный контур.
Использование обоих режимов межкластерной репликации Data Ocean Flex Loader по сути реализует «из коробки» принцип организации отказоустойчивых систем, который принято называть «Double ETL». При этом принципе:
Система-источник опрашивается один раз;
Данные пишутся сразу на два плеча;
В prod-системе производится расчет производных данных (аналитические слои, витрины);
Изменения производных данных синхронизируются на резервное плечо;
Резервное плечо всегда синхронно с основным промышленным как по первичным данным, так и по производным;
Отставание зависит только от ограничений канала передачи данных между ЦОДами;
При замене ролей между PROM- и DR-контуром Flex Loader может также изменить направление репликации.
Контроль качества данных
Все внутренние процессы создания производных данных внутри хранилища или пользовательские разработки должны оперировать данными, которым они могут доверять с точки зрения качества. Встроенные проверки качества данных на этапе загрузки – самый лучший способ заработать это доверие. В Data Ocean Flex Loader реализованы следующие проверки:
Количественные сверки между источником и приемником
Проверка становится особенно актуальна, когда вы пользуетесь режимом логического инкремента, но в ваш источник могут «долетать» строки задним числом. В этом случае Flex Loader может автоматически перезахватить диапазон данных из источника заново с учетом долетевших строк;Уникальность первичного ключа
Не все целевые системы-приемники имеют встроенные ограничения и проверки уникальности, и не всегда первичный ключ может быть задан на стороне приемника. Бывает, что система-источник изначально не имеет уникального ключа, и в этом случае на стороне приемника предусмотрен режим дедупликации;Проверка обязательного заполнения полей
Проверяется наличие NULL-значений по перечню полей, являющихся обязательными для заполнения;
Проверка мутации схемы источника
Выполняется проверка схождения DDL-источника и приемника, и в зависимости от типа изменений и настроек инструмента возможны три сценария:Наследуются изменения из источника в приемник;
Игнорируются изменения с уведомлением в журналах;
Выполняется остановка процессов загрузки, если изменения требуют осознанного вмешательства администратора.
Возможности интеграции
В большинстве случаев наполнение первоначального слоя хранения (Raw Data или ODS) – это только первый шаг pipeline’а обработки и трансформации при внедрении хранилища, за которым следуют многочисленные процессы, связанные зависимостями. Как правило, в ландшафте ХД имеется оркестратор, который управляет очередностью всех шагов загрузки и обработки данных от момента извлечения из источника до материализации в целевом слое хранилища или выгрузки производных данных в систему-приемник. Для интеграции c внешними оркестраторами и планировщиками можно использовать два готовых документированных вида интерфейса: процедурный FL API и REST API.
В практическом плане реализуются следующие сценарии использования FL:
Запуск расчета витрины данных или области детального слоя
Для реализации необходимы несколько таблиц. При этом данные для расчета из систем-источников нужны самые свежие;Внешний оркестратор процессов через API создает ticket во Flex Loader
Модель «сходи в системы-источники и обнови мне Х таблиц». После получения положительного ответа о завершении задания оркестратор событийно запускает расчет производных данных по цепочке обработки.
Графический интерфейс
Data Ocean Flex Loader имеет интуитивный пользовательский графический интерфейс. Основная идея, с которой мы брались за реализацию GUI, – сделать функцию создания и управления загрузкой данных доступной для бизнес-пользователей, а уже потом – помочь администраторам создавать, настраивать, планировать, запускать и сопровождать процессы репликации данных. Наличие графического интерфейса позволяет снизить требования к квалификации персонала при работе с системой.

Все взаимодействие графического интерфейса с back-частью инструмента происходит через REST API, поэтому выбор способа взаимодействия всегда остается за конечными пользователями: управлять через REST-сервисы, работать с GUI, посредством встроенного native API, через командную строку или же пользоваться всеми методами сразу в зависимости от ситуации.

В графическом интерфейсе реализована ролевая модель работы.
План развития
Data Ocean Flex Loader имеет роадмап развития, который формируется и корректируется в соответствии с запросами пользователей и трендами рынка. В настоящий момент мы реализуем модуль экспорта, чтобы доставлять производные данные из хранилища в системы-потребители. Flex Loader продолжает расширять список поддерживаемых систем-источников для интеграции. В ближайшее время добавится Kafka.
Мы расширяем перечень целевых движков Big Data, которые отвечают за применение данных согласно выбранному сценарию. В течение 2025 года планируем добавить StarRocks, который уже вошел в состав Lakehouse-платформы Data Ocean Nova, и уже под него адаптировать сценарии эффективного применения данных в целевую схему. Мы планируем добавить новые режимы захвата источников с большей вариативностью для снижения нагрузки при извлечении данных. Также в планах включить возможность установки расширений на стороне системы-источника с целью унификации общения инструмента с источниками в процессе работы через стандартный инструментальный API.
Прямо сейчас мы улучшаем возможности интеграции с Teradata с использованием функционала Native Object Store. Команда продукта уже тестирует функционал multi apply тиражирования, при котором данные из системы-источника извлекаются один раз, но применяться могут к нескольким разнородным системам-приемникам. Например, извлекли свои данные из Oracle и затем доставили их и применили сразу к трем разным системам: Hadoop, Greenplum, S3 lakehouse.
В 2025 году в графическом интерфейсе появится поддержка мультитентаности, при которой в случае использования нескольких инсталляций Flex Loader будет доступна возможность в одном GUI переключаться между экземплярами.
****
Инструмент загрузки данных Data Ocean Flex Loader и универсальная lakehouse-платформа больших данных Data Ocean Nova разработаны компанией Data Sapience (входит в группу компаний GlowByte).