Я работаю ведущим разработчиком ПО в компании STM Labs и хочу рассказать вам о том, как мы переносим данные из Oracle в PostgreSQL с использованием open-source решений Airflow и NiFi. Эта статья написана по мотивам моего выступления на митапе «Мир Open Source: кейсы реального применения».
С необходимостью переноса данных из Oracle столкнулись многие российские компании: в июле 2022 года корпорация, создавшая этот продукт, ушла с российского рынка из-за санкций, как и множество других зарубежных IT-компаний. У пользователей из нашей страны больше нет поддержки вендора, а значит со временем система может перестать корректно работать.
Система хранения Oracle была очень популярна в России: данные в ней хранили и обрабатывали даже компании из государственного сектора. И всем нам предстояло быстро решить, куда и каким образом перенести огромные объёмы ценной информации, ничего не потеряв в процессе переноса.
Условия задачи
Для дальнейшего хранения данных наш клиент выбрал российскую систему PostgreSQL. Важно было найти СУБД, которая смогла бы обеспечить стабильную работу и имела бы поддержку от разработчика. PostgreSQL хорошо подошла под эти критерии.
Но каким образом выполнить перенос без потерь? Как реализовать обратную синхронизацию для систем, которые ещё не успели переехать? После анализа различных вариантов и предложений мы сошлись на системах Airflow и NiFi, причём использование последней было обязательным требованием самого заказчика.
Также от будущих пользователей системы поступил целый перечень пожеланий:
переливка всей таблицы (словари и мелкие таблицы);
переливка дельты по колонке времени (большие таблицы);
переливка дельты по транзакции, если нет колонки времени;
при переливке таблиц должна соблюдаться связь между ними, причем они не обязательно должны быть связаны ссылками — достаточно логической связи или связи бизнес-данных;
переливка master-detail таблиц: есть основная таблица с ключом и таблицы, которые дополняют записи данными. Дата обновления всегда проставляется в мастер-таблице;
и самое главное: если при переливке связанных таблиц произошла ошибка, никакие данные ни в коем случае не должны попасть в получатель. Если ошибка всё же произошла, данные должны откатиться к своему изначальному состоянию, а система должна оповестить пользователя о наличии проблемы.
Из чего состоит наша система переноса данных
По итогам работы над задачей нам удалось разработать систему, выполняющую все необходимые функции. Состоит она из следующих компонентов:
конфигуратор,
Snapshot Manager,
Aiflow,
NiFi,
NiFi Task Manager,
системы хранения PostgreSQL,
Redis.
Все эти компоненты связаны друг с другом следующим образом:
Давайте теперь разберём, как работает каждый компонент по отдельности.
Конфигуратор
Конфигуратор — это веб-приложение, которое создаёт конфигурации для Airflow, NiFi и других смежных систем на основе пользовательских настроек.
Войдя в конфигуратор, мы должны зарегистрировать базы данных источника и получателя. Необходимо указать:
• подсистему,
• кому принадлежит источник,
• системный код источника,
• наименование,
• настройки подключения к БД.
Затем в разделе схем необходимо зарегистрировать схемы, из которых можно переливать данные. Этот шаг необходим, чтобы не пришлось сканировать всю базу данных. Система просканирует источник и подтянет имена всех таблиц и полей с их типами данных.
Добавив в базу данных все возможные соединения, переходим к настройкам потока. Поток содержит в себе настройку переливки нескольких таблиц, а также возможность указать, в каком порядке делать миграцию данных.
Для каждой таблицы или запроса из источника нужно настроить таблицу-получатель:
Указать, что в источнике является ключом, и какой тип репликации необходим для данной таблицы. Например, если мы выбираем репликацию по времени, то необходимо указать, какая колонка отображает дату изменения записи. При этом в получателе не обязательно делать такую же колонку:
На второй вкладке нужно настроить получателя и маппинг полей. Если поля в источнике и получателе именуются одинаково, происходит автоматическая подстановка значений:
Когда модель будет создана и активирована, конфигуратор проведет автоматическую настройку смежных систем: создаст временные таблицы в получателе, положит новый DAG-файл для Airflow и при необходимости настроит новое подключение к БД в NiFi.
После этого пользователь сможет отслеживать ход работы своих задач прямо через интерфейс, где будут отображаться успешные задачи, а также ошибки, которые могут возникнуть в процессе миграции данных.
Snapshot Manager
Snapshot в PostgreSQL — это моментальный снимок базы данных или транзакции, который позволяет сохранить состояние базы данных в определённый момент времени и при необходимости вернуться к этой точке из разных подключений. Эта функция даёт гарантию консистентности данных.
Подобный функционал есть и в Oracle — называется Oracle Flashback Query. Он также позволяет вернуться к предыдущему состоянию данных, но работает иначе: для него не обязательно держать открытую транзакцию, мы можем получить даже удалённые данные на определённый момент времени.
Наш сервис умеет работать с обеими базами данных: для PG он открывает и держит соединение, для Oracle запрашивает номер последней транзакции.
Airflow
Airflow — это платформа для автоматизации рабочих процессов. Она позволяет создавать и исполнять рабочие процессы, состоящие из различных задач и этапов.
Возможности Airflow:
Оркестрация задач. Airflow может выполнять задачи в определённом порядке, учитывая связи между ними, а также ограничивать нагрузку на работу с источником или типизировать её с помощью виртуальных пулов.
Мониторинг и оповещение. Airflow предоставляет инструменты для мониторинга рабочих процессов и оповещения в случае возникновения каких-либо проблем.
Масштабируемость. Airflow легко масштабируется для обработки большого количества задач.
Гибкость. Airflow предлагает большую гибкость в настройке рабочих процессов и в интеграции с другими системами. Он позволяет запускать одноразовые контейнеры на подах в Kubernetes, создавать виртуальные пулы, делать приоритезацию очередей, ограничивать количество одновременных запусков и многое другое. При желании вы можете написать свои задачи любой сложности на Python, когда создаёте конфигурацию для дагов.
Как мы используем Airflow в своей системе
Наши задачи на данный момент достаточно однотипны. Рассмотрим схему одной из них:
Мы видим первый блок INIT. Его задача — отправить информацию о запуске задачи в конфигуратор.
Далее идет группа, в которой параллельно подключены блоки, выполняющие чистку временных таблиц в получателе.
За ними следует блок snapshot_create, отвечающий за создание снимка и получения идентификатора для читателей.
Блок создания снимка связан пунктиром с его закрытием. В Airflow есть механизм закрытия ресурсов. Закрытие выполнится, даже если задача будет завершена с ошибкой.
Далее идёт группа, в которой также параллельно выполняются потоки на каждую переливаемую таблицу. Каждый блок переливки состоит из нескольких компонентов:
• получение последнего CDC-значения, которое мы уже переливали. Если такого нет, то получение начального стандартного значения;
• отправка запроса в NiFi через микросервис NiFi Task Manager для запуска переливки;
• ожидание завершения. Данный блок опрашивает NiFi Task Manager, на какой стадии находится задача в NiFi;
• после завершения мы получаем новый CDC, который будет записан для следующих запусков.Как только все группы переливок будут завершены, начнётся блок слияния данных merge. В блоке выполняются запросы update или insert в таблицы, которые указаны как получатели.
После успешного слияния мы записываем новые CDC-значения для следующих запусков.
В конце этого процесса нужно отчитаться конфигуратору об успешном завершения.
Если в процессе выполнения задачи произойдёт какая-либо ошибка, обработчик ошибок, стоящий над DAG-ом, оповестит о ней конфигуратор:
Минусы и проблемы
На данный момент проблемы с Airlfow у нас возникают только в процессе синхронизации статусов с конфигуратором. Хоть мы и обложили всё обработчиками, Airflow может просто прервать саму задачу, убив ее. Такое может произойти, если поток выключили, или случайно удалили конфигурационный файл, или сделали ручное завершение всей задачи в Airflow через принудительное проставление статуса задаче.
NiFi
NiFi — это ETL- инструмент для управления потоками данных, который состоит из множества мелких очередей и процессоров, обрабатывающих эти очереди.
Возможности NiFi:
• NiFi позволяет управлять потоками данных, выполняя такие задачи как преобразование форматов, сжатие и декомпрессия, а также перемещение данных между системами;
• NiFi легко масштабируется и может обрабатывать большие объемы данных;
• NiFi легко интегрировать в уже существующую инфраструктуру;
• а если нужных для этого инструментов нет, то вы всегда можете написать свои Java-компоненты.
Сложности, с которыми мы столкнулись
Единица NiFi — flowfile. В нем хранятся как сами данные, так и атрибуты. Процессоры берут такие файлы в работу и на выходе либо обновляют атрибуты, либо пересоздают новый flowfile с новыми данными. На один входящий файл система может создать сотни выходящих — например, поделить результат SQL-запроса на батчи. И нам необходимо понять, когда все полученные батчи будут залиты в получатель.
Другие минусы NiFi:
• тяжело проводить debug,
• развертывание потока происходит только через UI,
• почти невозможно обновить поток в работающем потоке.
Как мы работаем с NiFi
Работа с конкретной БД реализуется за счет добавления соответствующего JDBC-драйвера. Мы отправляем запросы к API-интерфейсу для создания новых соединений в системе. Сам поток запускается http-запросом, где указываются все настройки.
Любое простое действие, которое обычно занимает пару строк, в NiFi будет выглядеть так:
В базовой реализации мы видим 3 блока:
Первый блок нужен для проверки полученной конфигурации задачи. Здесь у нас простые базовые процессоры, которые проверяют наличие параметров и записывают их в атрибуты.
Следующий блок отвечает за создание записи в кэше Redis. На основе этой записи будет делаться прерывание задачи, если где-то случится ошибка или поступит сигнал извне. Также в кэше будет храниться количество прочитанных и записанных записей.
Далее поток разделяется на 2 ветки: первая (которая уходит в левую сторону) отправляет успешный ответ о создании задачи, вторая (уходящая вниз) направлена на асинхронное выполнение переливки с попутным перескакиванием на одну из нод NiFi.
Наконец, третий блок делает саму переливку данных и записывает информацию в кэш.
Выполнение задачи состоит из двух блоков — чтения и записи:
Первое, что делается как в чтении, так и в записи, — это проверка реализации. Переданный параметр идет к нужной реализации чтения.
Для чтения нами был написан собственный процессор на основе стандартного. Он доработан так, что может проверять кэш на статус по задаче. И если в какой-то момент задача прервется, он завершит свою работу ошибкой:
Далее идет блок, который записывает в кэш информацию о количестве прочитанных батчей. На этом чтение закончилось:
Блок записи выглядит почти так же: у него также есть собственный процессор, который перед записью будет делать проверку задачи в кэше. А после записи он будет отправлять в кэш информацию о количестве записанных батчей:
На этом работа NiFi закончена. Его основная задача — гонять данные, пока они не прервались.
NiFi Task Manager
Последний компонент системы — это микросервис, который синхронизирует работу Airflow и NiFi. Его основная задача — мониторить ход выполнения процессов через кэш Redis. Он получает количество прочитанных и записанных батчей, и на основе этой информации делает вывод, что задача завершилась. Именно через него идёт запуск и отмена задачи.
Итак, в процессе поиска способов переноса данных из Oracle в PostgreSQL у нас получился довольно мощный и гибкий комплекс, который мы продолжаем дорабатывать и улучшать. В дальнейшем мы планируем расширить его функционал: например, помимо баз данных, можно будет добавить новые типы источников и получателей, если это понадобится заказчику и пользователям системы. В любом случае уже сейчас эта разработка позволяет без потерь переносить данные из системы в систему.