Мы является телеком-оператором с давней и богатой историей, а также с огромным и разнообразным системным ландшафтом. В силу разных причин самой используемой БД, до недавнего времени, у нас была Oracle. Конечно, в каких-то объемах присутствуют и многие другие БД (MSSQL, MySQL, ...), но все таки Oracle занимает (занимал) особое место.
В определенный момент был взят курс на импортозамещение (далее в тексте ИМЗ) и мы пришли (не сразу) к тому, что стандартом в организации стал Pangolin Platform V от Сбертеха.
Также массово стартовали проекты по переводу систем на:
Приятно, что нам удалось встать в первых рядах этих процессов (у себя в организации), планируем и дальше развивать эти компетенции и реализовывать успешные проекты.
PS не по теме: последнее время читаю много холиваров на тему привлечения ИИ для написания "всего-чего-угодно", а в нашем случае - и для статей на хабре. Не пробовал, но осуждаю ). Поэтому готов ответственно заявить - в рамках подготовки этой статьи ни один ИИ не пострадал (и даже не был в курсе происходящего), поэтому где-то текст может быть не идеальным, но зато - живым и авторским.
Взгляд с технической стороны
Предполагается (руководством), что ИМЗ проекты, по возможности должны делаться так (упрощенно):
Меняется инфраструктура - как правило это перевод с отдельно-стоящих bare-metal серверов (зачастую стоящих в филиалах) на централизованный ЦОД с виртуальными машинами
Меняется OC - серверным стандартом у нас сейчас является RedOS
Меняется СУБД на Pangolin (в основном, если мы не говорим о специфических решениях). Здесь одна из самых больших сложностей - переписывание процедурного кода либо на pl/pgSQL, либо перенос его на уровень приложений. Сложность поменьше - перенос данных.
Меняется код приложений. Либо минимально - чтобы заменить вызовы одной СУБД на другую, либо в разной степени значительности (вплоть до замены стеков, фреймворков, языков)
Учитываются современные требования к информационной безопасности (протоколы, шифрование, хранение данных, антивирусная защита и т.д.)
На этом шаге - куча документации, приемо-сдаточные мероприятия, ввод в эксплуатацию
Профит, обновленная система работает
Legacy hell (о технической сложности)
Что не так с предыдущим разделом? Вроде бы сделай себе эти 7 шагов и живи себе спокойно.
На практике все сильно иначе:
Отсутствие (частичное или полное) документации. Не всегда все описано, сиди и разбирайся как хочешь
Не актуальная документация. Частая история.
Отсутствие исходников. Ну, это все-таки не слишком часто, но встречалось и такое.
Хаотичное хранение исходников. Да да, все эти ваши модные git'ы придумали уже потом, а для исходников БД и до сих пор единого подхода нет
Отсутствие экспертизы. Системы нередко старые, "отцы основатели" либо уволились давно, либо вообще на пенсии. И опять таки - сиди и разбирайся сам.
Старые фреймворки. Понятно же, что редко когда что то написано на "ванильном" языке - фреймворки наше все и они бывают разные. С точки зрения миграции - повышенную сложность представляют из себя системы, написанные на Oracle Forms/Apex.
Старые языки программирования. Сложно в наше время найти разработчика, например, на Perl, а команду - и вообще невозможно. с PHP ненамного легче. Даже такие повсеместно используемые прикладные истории, как VBA уже тоже отходят на второй план.
Взгляд реалиста
Подводя итог подвышесказанным — редко когда удается смигрировать систему «четко и по графику» — мир изменился, изменились люди и подходы — не в последнюю очередь.
Поэтому в той или иной степени, переводя систему на новые рельсы все пытаются:
Провести рефакторинг, хотя бы минимальный (когда если не сейчас?), «переархитектурить» самые «кривые» места.
Упростить БД, вынести всю или часть логики на слой приложений. Как бы я к этому не относился и не любил «бэдэшный код» — этот тренд нельзя не признать.
Перейти на интересные (комфортные, актуальные, «модные») языки программирования — современная Java, Kotlin, Python (и даже Go и может быть и Rust), на фронте — React, Flutter или Compose.
Перейти на «нормальные» сервисы или микросервисы, либо, хотя бы, на модульное приложение — не в последнюю очередь для простоты и прозрачности.
Предмет разговора
Эта статья в основном посвящена миграции непосредственно данных (не процедурного кода), поэтому акцент будет ��делан именно на эту часть.
Очень сложные (большие, высоконагруженные) системы
Есть особый "класс" систем, я назвал бы их "предприятиеобразующие". Для нас это классические системы телекома (погрузиться в это можно здесь - eTOM):
Биллинги (автоматизированные системы расчета)
Системы техучета
CRM
и т.д.
Как правило это очень сложные, высоконагруженные системы, соответственно и подходы в случае импортозамещения систем всегда уникальны.
Скрытый текст
Нередко про такие системы (вернее про их миграцию) рассказывают на конференциях pgConf, в память врезались некоторые из них:
Система, для которой написали тр��нслятор SQL запросов с использованием абстрактного синтаксического дерева на ANTLR
Всевозможные варианты с "догонянием" изменений в БД через инструменты CDC (такие как Debezium и подобные)
Варианты с дублированием потоков "в два места" из доработанных версий приложений
И другие
Но мы сегодня поговорим, все таки не о них (помните же да, анекдот про блох?)
"Обычные", но не менее нужные бизнесу системы
Вот о них мы и поговорим. Классифицировал я их так условно
хотя по нашей классификации - это системы класса "Business Operational"
ИС, выход из строя которых затрагивает только внутренние процессы Общества и не приводит к потерям в среднесрочном периоде. В долгосрочном периоде создаются существенные неудобства пользователям
Чем они характерны?:
Как правило не охватывают «все бизнес процессы компании», а реализуют определенный ряд задач
У них не такое уж больше число пользователей, так как системы «нишевые». Условно — от 1 до 200 пользователей
Объемы БД — не космические, хотя бывает так, что много занимают исторические сведения
Не высоконагруженные (или нагруженные не 24/7, а с определенной периодичностью), системы, при эксплуатации которых возможно организация «технологических окон"
Узнали такие системы? По моему опыту, их как раз большинство.
Как мы будем их мигрировать
Я не случайно разделил системы на две "корзины", так как для систем из второй категории:
Обычно срок на разработку ПО и миграции - сравнительно небольшой
Команда проекта - также компактная
Исходя из этого - хочется избежать избыточно сложных решений и сделать задачу одновременно и просто и эффективно. Но нюансов то в них совсем не меньше!
История первой миграции
Первая миграция для меня была во многом испытанием, выработкой подходов. Мне выдали проект, напутствие было кратким, разве что не перекрестили.
Самой главной сложностью было то, что функциональность (а за ней кодовая база системы, включая структуры БД) активно развивалась командой линейного развития (ни о каких фризах и синхронизациях договориться не получилось), а команда миграции была постоянно догоняющей. Почти тогда в сознании (а потом и в реализации) закрепился первый "постулат" миграции:
Миграция должна быть автоматической. Вообще. Желательно в один шаг - в нужный момент нажал кнопочку, подождал - и все, имеешь актуальную копию БД, только уже не в Oracle, а в Postgres. Если не получается "одной кнопочкой", то хотя бы с внятным перечнем шагов и контролем их выполнения.
Подумаем, как этого можно добиться, но сначала все таки проговорим про "неавтоматическую миграцию":
Предположим у нас неограниченное во времени технологическое окно. Или оно ограничено, но есть (человеческий, машинный) ресурс произвести перенос за это время.
Сценарий в этом случае простой:
Выключаем исходную БД (можно не сразу, если мигрируем "на горячую") - технически переводим БД в Read Only режим
Переносим данные в Postgres (с нужными преобразованиями, маппингами типов и т.д.)
Переконфигурируем систему, запускаемся уже на Postgres. При этом второй шаг 2 - либо ручной, либо во многом ручной (например при помощи скриптов или вообще csv-файлов). Здесь же запишем вторую мысль - в этом проекте не было необходимости усложнять миграцию процессами синхронизации данных "налету", миграцию можно было провести в одно или несколько технологических окон.
Что мне тогда пришло в голову, чтобы процесс автоматизировать (варианты):
Классичеcкий ETL. Ну да, а почему нет? Очень много его использовали - помните такой - квадратики, стрелочки, все красиво и графически. IBM DataStage, Talend Open Studio и много всяких разных
Так выглядит IBM Datastage. Картинка из интернета. Примерно также - Talend Open Studio, картинка из интернета Более современные "около"-ETL - инструменты, основанные на коде, например Apache Airlow, мы его изучили, он нам понравился, мы периодически его используем
Новая волна - Apache Airlow. Вы пишете код (по соответствующим правилам) на python, а эту красоту он сам уже рисует. Картинка из интернета А может попроще? Тогда мне на глаза попался старый добрый
dblinkOracleFDW и он мне очень понравился (настолько, что пришлось залезть и поправить его исходники). Напишем большую большую простыню кода (хранимую процедуру или несколько) по автоматическому переносу данныхИли изучим как работает ora2pg?
На самом деле он тоже помог, но не много по другому:
С помощью него удавалось сравнительно неплохо мигрировать схему данных - т.е. скрипт создания объектов, а после чего, получая схему данных снова и снова - находить разницу в объектах и оперативно понимать, что успела наработать команда развития за этот период, после чего подстоиться под них (даже не заходя в их git)
Третья мысль - мне нужна возможность настройки процесса миграции (а не только возможность делать полные копии таблиц), которую я назвал бы "правилами миграции". Чтобы:
Подвигать таблицы между схемами, переименовать их (т.е. разное название приемника и потребителя)
Где-то преобразовать типы данных, иногда не автоматически, а по своей логике
Перенести не все столбцы, а часть. Поменять наименования и т.д.
И самое важно - миграция "порциями", (частями, секциями и т.д.) - так, чтобы можно было выполнить частичную миграцию, а потом продолжить "с того же места".
Душа склонялась к выбору простого и выразительного инструмента OracleFDW, но некоторые вещи все таки смущали:
"не переиспольуемость". Ну т.е. напишу я простыню кода из операторов
truncate table / insert into table select from oracle_table
. А еще кучу проверок на сравнение количеств строк "до" и "после", на сравнение таблиц через "except". Работать конечно будет, но и особо не изящно - по сути один boiler-plate кодЭволюция этой простыни будет такой - ее надо будет постоянно дописывать, переписывать, в итоге она вырастет в "god-script"
Забегая вперед скажу, что все таки в качестве базового инструмента был выбран как раз OracleFDW, но само описание мигратора - в следующей главе.
Полный рассказ об этой миграции - в статье, в ней же упоминается мигратор, который я описываю здесь (но, конечно, не так подробно).
Описание мигратора
Итак, решено! Берем FDW и ... И пишем инструмент, который позволит нам настраивать и контролировать процессы миграции данных. Он состоит из:
Нескольких схем, для логического разделения (сначала была одна, потом две, сейчас уже 4) - схемы создаются (временно) в целевой БД, используются там же, по окончанию работ по миграции (а лучше непосредственно перед выводом системы в промышленную эксплуатацию) могут быть безболезненно удалены
Настроечных таблиц, в которые, как раз, мы запишем наши "правила миграции"
Функций и процедур, которые возьмут "правила" из таблиц и "сделают все что им скажут"
Схемы
repl_data | Промежуточные данные. Сейчас хранится одна таблица, в которой хранится подсчет числа строк в секциях таблиц (секционированных, конечно) |
repl_log | Протокол миграции и представления для его просмотра |
repl_meta | Метаданные, т.е. всё, что не зависит от конкретной (мигрируемой) системы - процедуры и функции |
repl_settings | тоже метаданные, но уже наоборот все, что относится к данной конкретной системе |
Схема repl_settings
Начнем с самого важного - настройки миграции (для нашей конкретной системы)
repl_entity
Сущности, они же таблицы для миграции. Записывается наименование (со схемой), ссылка на "набор операторов" (он же statement_set
), ключевое поле (или несколько), поле секционирования (для секционированных таблиц), список полей в таблице (из них потом будет конструироваться select / insert
)
Пример заполнения:
INSERT INTO repl_settings.repl_entity (entity_name,entity_descr,statement_set_id,key_fields,partition_field,field_list) VALUES
('MY_SCHEMA.MY_TABLE',NULL,8,NULL,NULL,'{default}');
Здесь видно, что для списка полей использовано служебное значение '{default}
' - это значит, что перечень полей будет автоматически собран из списка полей таблицы-приемника.
db_reference
Информация о БД. Используется, так как протоколирование сделано на "автономных транзакциях" через расширение db_link.
Пример заполнения:
INSERT INTO repl_settings.db_reference (db_name,url) VALUES ('my_db','Закодировали в base64');
В url хранится строка: dbname=A host=localhost port=5432 user=B password=C
repl_process
Таблица справочник, в которой собраны виды репликационных процессов, рассмотрим их:
SEQ | Синхронизация значений последовательностей |
FK | Включение/выключение внешних ключей |
ENTITY_REPLICATION | Репликация сущностей. Собственно это основная задача |
FILL_PARTITION_COUNTS | Заполнение количества строк по ключам секций - для секционированных таблиц |
ENTITY_PARTITION_REPLICATION | Репликация одной секции одной таблицы |
SYNC_PARTITIONED_TABLE | Репликация секционированной таблицы (целиком, под капотом вызывает ENTITY_PARTITION_REPLICATION) |
repl_schemas
Информация о схемах
INSERT INTO repl_settings.repl_schemas (schema_group, src_schema, dst_schema, schema_name)
VALUES('группа_схем', 'схема_источник_fdw', 'схема_приемник', 'схема_источник_oracle');
Здесь:
схема_источник_fdw - это наименование схемы Oracle на стороне Postgres (которое импортировано через
IMPORT FOREIGN SCHEMA
схема_источник_oracle - оригинальное наименование схемы (на стороне Oracle)
repl_statement_sets
Одна из самых главных таблиц - наборы SQL операторов. Здесь мы задаем "набор" SQL-операторов, которые будут вызываться для сущности в разных ситуациях, их тоже рассмотрим:
Поле | Описание | Пример заполнения |
set_mnemo | Мнемоническое имя набора | default_partition_1_value_sync (синхронизация по умолчанию сущности, которая партиционирована по 1 значению) |
src_entity | Сущность источник | {src_SG_entity} (Здесь используется частичная подстановка SG - наименование группы схем) |
dst_entity | Сущность приемник | {dst_SG_entity} |
delete_stmt | Оператор удаления | {default} |
insert_stmt | Оператор вставки | {default} |
count_stmt | Оператор подсчета числа строк на приемнике (после вставки) | {default} |
count_src_stmt | Оператор подсчета числа строк на источнике | {default} |
repl_type | Тип репликации | PARTITION_SINGLE_VALUE |
do_truncate | Использовать truncate вместо delete (пока не работает) | true |
max_key_value_stmt | Оператор вычисления максимального значения ключевого поля | SQL оператор |
count_partition_stmt | Оператор вычисления числа записей в разрезе поля партиционирования | SQL оператор |
create_partition_stmt | Оператор создания секции | SQL оператор |
Как мы видим, в данной таблице описывается практически вся логика работы с сущностью, позднее мы рассмотрим ее работу на примере. Здесь также стоит обратить внимание на "подстановки" - т.е. все то, что записано в фигурных скобках, это одна из ключевых особенностей мигратора. Но пока опишем все остальные таблицы.
custom_statements
Таблица содержит любые операторы, которые могут быть впоследствии в любых подстановках.
repl_fk
Перечень внешних ключей. Таблица используется при включении / выключении внешних ключей (путем удаления / создания)
INSERT INTO repl_settings.repl_fk (num, fk_create_text, fk_enabled)
VALUES(1, 'ALTER TABLE schema.table ADD CONSTRAINT fk_name FOREIGN KEY (fk_field) REFERENCES ref_schema.ref_table(ref_field);', true);
Схема repl_meta
Мы проговорили про настройку миграции (для конкретной системы), сейчас рассмотрим схему, которая содержит метаданные - здесь только функции и процедуры.
p_do_sync_cycle
Процедура, которое осуществляет полный цикл миграции. Сейчас требует пересмотра и давно не использовалась (так как мигратор сущесвтенно дорабатывается с каждой мигрируемой системой), она делает действия:
Удаляет внешние ключи
Запускает полный цикл миграции для "группы схем"
Заново создает внешние ключи
Устанавливает значения последовательностей
p_do_full_sync
Спускаемся чуть ниже - цикл полной миграции. Он пробегает по всем сущностям и для каждой запускает миграцию, согласно ее типу миграции. В настоящее время тоже требует пересмотра, так как многое изменилось "под капотом".
r_fill_partition_counts
С этой функции уже начинается "самое мясо". Она сервисная - служит для подсчета количества записей по секциям в разрезе ключа партиционирования
r_sync
Функция служит для запуска репликации одной таблицы (при полной репликации) или одной секции партиционированной таблицы.
p_do_sync_partitioned_table - работа с секционированными таблицами
Поддерживаются 2 типа секционированных таблиц:
Секционирование по одному значению (порция данных)
Секционирвование по временному диапазону (месяц)
По потребности - будем реализовывать и другие типы.
Для секционированных таблиц логика немного сложнее, так как в них реализованы действия:
Подсчет числа строк по каждому ключу секционирования (в режимах "если еще не считали" или "полный пересчет")
Сравнивание числа строк на источнике и по протоколу синхронизации в разрезе каждой секции
Таким образом у нас появляется возможность - синхронизировать только те секции, которые мы еще не синхронизировали или те, в которых число строк изменилось
Под капотом также вызываем
r_sync
.
В последствии хочется расширить логику секционированых таблиц:
Запуск частями (% соотношение, например "одна пятая от общего объема данных")
Ограничения секций "по датам", например - данные не старше чем месяц назад.
Схема repl_log
В этой схеме содержатся протоколы работы. Самая главная таблица - repl_log_data
.
Пример данных в таблице:

Пару слов про {подстановки}
Подстановки несут одну из самых важных (с точки зрения удобства) функций - возможности переиспользования SQL операторов. Действительно, если мы в набор операторов в Insert
запишем "insert into {table_name}
", а затем свяжем этот набор операторов с определенной сущностью и научим вместо {table_name}
подставлять имя сущности - бинго, мы этот набор операторов можем использовать несколько раз и именно так и происходит, мы создаем:
Набор для простых небольших таблиц (справочников)
Набор для секционированных таблиц
Набор для больших таблиц, которые будем мигрировать "по кусочкам" (но которые не секционированы)
Набор для миграции таблиц по "стратегии merge" (или soft insert).
За разыменование подстановок отвечает функция repl_meta.r_get_substitution
Запуск мигратора
Пожалуй хватит теории, перейдем к практике, покажу запуск репликации "обычной" таблицы и секционированной. Начнем с "посложней" - партиционированная.
Нюанс про секции
На самом деле таблица (фактически) не обязана быть секционированной "по честному", можно синхронизировать обычную таблицу, "порезав ее на кусочки" (например - по месяцу). Для этого нужно просто не заполнять оператор создания секции.
Порционная загрузка
Итак, запускаем:
call repl.p_do_sync_partitioned_table(999, 'schm_grp', 'TRGT_SCHM.TRGT_TABLE', true);
И наблюдаем за выводом RAISE NOTICE
(он будет максимально подробный, так как установлен высокий уровень ведения логов - 999.
Здесь дополнительные комментарии (мои) отмечены с символов --.
Результат (впечатлительным лучше не смотреть)
-- Сначала инициируется подсчет количеств строк по ключу секционирования
2025.10.09 12:31:04 [begin] repl_meta.r_fill_partition_counts('999', 'schm_grp', 'TRGT_SCHM.TRGT_TABLE')
--В качестве SQL оператора у нас используется "custom" оператор с именем src_count_by_month_v_m,
разыменуем его:
before substitution: {src_count_by_month_v_m}
-- получили "реальный оператор", но он все еще содержит подстановки (partition_field_name, ...)
custom statement used: select DATE_TRUNC('month', c.{partition_field_name}) as partition_field_value,
count(*) as rec_count, null as add_info from src_ora_schema.v_m_{entity_short} c
group by DATE_TRUNC('month', c.{partition_field_name})
order by DATE_TRUNC('month', c.{partition_field_name})
-- После выполнения всех подстановок - получим итоговоый оператор
after substitution: select DATE_TRUNC('month', c.valid_from) as partition_field_value,
count(*) as rec_count, null as add_info from src_ora_schema.v_m_SRC_TABLE c
group by DATE_TRUNC('month', c.valid_from)
order by DATE_TRUNC('month', c.valid_from)
-- Выполним его, результат запишем в таблицу repl_data.repl_partition_rec_count
-- И обновим запись протокола, об этом действии:
update repl_log.repl_log_data
set status = 'DONE',
d_load_end = '2025-10-09 12:31:09.362861+03'::timestamp,
rec_count = 971312::int8,
mod_count = 0::int8,
del_count = 0::int8,
max_key_value = 0::int8,
repl_type = 'PARTITION_MONTH'::repl_meta.t_repl_type,
partition_key_value = 'null'
where id = 13279
2025.10.09 12:31:09 [end] repl_meta.r_fill_partition_counts('999', 'schm_grp', 'TRGT_SC
HM.TRGT_TABLE')
-- конец процесса
Здесь немного передохнем и пойдем дальше - обеспечим саму миграцию секций:
-- Поехали обрабатывать секции друг за другом
2025.10.09 12:31:09 Process partition: num = 1 , partition field value = 2025-07-01 00:00:00
-- Вызываем функцию синхронизации
2025.10.09 12:31:09 [begin] repl_meta.r_sync('999', 'schm_grp', 'TRGT_SCHM.TRGT_TABLE', '2025-07-01 00:00:00')
-- Удаляем данные если они были (на всякий случай)
before substitution: delete from {dst_entity} where {partition_field_name} >= cast('{partition_field_value}' as date) and {partition_field_name} < cast('{partition_field_value}' as date) + interval '1 month';
after substitution: delete from TRGT_SCHM.TRGT_TABLE where valid_from >= cast('2025-07-01 00:00:00' as date) and valid_from < cast('2025-07-01 00:00:00' as date) + interval '1 month';
before substitution: {nms_insert_partition_v_m}
-- Получаем Insert оператор
-- (обратите внимание на автоматический cast полей - это в дальнейшем поможет нам с json)
custom statement used: insert into {dst_entity} ({field_list}) select {field_list_casted}
from src_ora_schema.v_m_{entity_short} {partition_where};
after substitution: insert into TRGT_SCHM.TRGT_TABLE (valid_from, sw_id, tgno, opmode, connected_lines, semi_blocked_lines, trans_blocked_lines, ccu_sum_ofl_loss, cc_i, cc_o, ccs_with_answer_i, ccs_with_answer_o, tv_i, tv_o, avialable, file_id, status) select cast(valid_from as timestamp), cast(sw_id as int8), cast(tgno as varchar), cast(opmode as bpchar), cast(connected_lines as int8), cast(semi_blocked_lines as int8), cast(trans_blocked_lines as int8), cast(ccu_sum_ofl_loss as int8), cast(cc_i as int8), cast(cc_o as int8), cast(ccs_with_answer_i as int8), cast(ccs_with_answer_o as int8), cast(tv_i as int8), cast(tv_o as int8), cast(avialable as int8), cast(file_id as int8), cast(status as int8)
from src_ora_schema.v_m_SRC_TABLE where valid_from >= cast('2025-07-01 00:00:00' as date) and valid_from < cast('2025-07-01 00:00:00' as date) + interval '1 month';
-- Самое время посчитать количество строк (на приемнике) - получим оператор
before substitution: select count(*) from {dst_entity} where {partition_field_name} >= cast('{partition_field_value}' as date) and {partition_field_name} < cast('{partition_field_value}' as date) + interval '1 month';
after substitution: select count(*) from TRGT_SCHM.TRGT_TABLE where valid_from >= cast('2025-07-01 00:00:00' as date) and valid_from < cast('2025-07-01 00:00:00' as date) + interval '1 month';
before substitution: {nms_cnt_src_partition_v_m}
-- И на источнике - получим оператор
custom statement used: select count(*) from src_ora_schema.v_m_{entity_short} {partition_where};
after substitution: select count(*) from src_ora_schema.v_m_SRC_TABLE where valid_from >= cast('2025-07-01 00:00:00' as date) and valid_from < cast('2025-07-01 00:00:00' as date) + interval '1 month';
-- Получим оператор создания секции таблицы (это самописная, наша функция)
before substitution: {nms_create_partition_by_month}
custom statement used: select * from partn.f_add_partition_month('{dst_nms}', '{entity_short}', cast('{partition_field_value}' as date));
after substitution: select * from partn.f_add_partition_month('schm_grp', 'TRGT_TABLE', cast('2025-07-01 00:00:00' as date));
-- Вызовем функцию синхронизации одной секции
2025.10.09 12:31:10 [begin] repl_meta.r_sync_partition_single_value('999', '88', 'PARTITION_MONTH', '13280', 'delete from TRGT_SCHM.TRGT_TABLE where valid_from >= cast(''2025-07-01 00:00:00'' as date) and valid_from < cast(''2025-07-01 00:00:00'' as date) + interval ''1 month'';', 'insert into TRGT_SCHM.TRGT_TABLE (valid_from, sw_id, tgno, opmode, connected_lines, semi_blocked_lines, trans_blocked_lines, ccu_sum_ofl_loss, cc_i, cc_o, ccs_with_answer_i, ccs_with_answer_o, tv_i, tv_o, avialable, file_id, status) select cast(valid_from as timestamp), cast(sw_id as int8), cast(tgno as varchar), cast(opmode as bpchar), cast(connected_lines as int8), cast(semi_blocked_lines as int8), cast(trans_blocked_lines as int8), cast(ccu_sum_ofl_loss as int8), cast(cc_i as int8), cast(cc_o as int8), cast(ccs_with_answer_i as int8), cast(ccs_with_answer_o as int8), cast(tv_i as int8), cast(tv_o as int8), cast(avialable as int8), cast(file_id as int8), cast(status as int8)
from src_ora_schema.v_m_SRC_TABLE where valid_from >= cast(''2025-07-01 00:00:00'' as date) and valid_from < cast(''2025-07-01 00:00:00'' as date) + interval ''1 month'';', 'select count(*) from TRGT_SCHM.TRGT_TABLE where valid_from >= cast(''2025-07-01 00:00:00'' as date) and valid_from < cast(''2025-07-01 00:00:00'' as date) + interval ''1 month'';', 'select count(*) from src_ora_schema.v_m_SRC_TABLE where valid_from >= cast(''2025-07-01 00:00:00'' as date) and valid_from < cast(''2025-07-01 00:00:00'' as date) + interval ''1 month'';', '')
-- Собственно здесь начались действия, согласно полученных операторов выше
-- Создаем секцию
2025.10.09 12:31:10 Creating partition
2025.10.09 12:31:10 [l_create_partition_stmt] select * from partn.f_add_partition_month('schm_grp', 'TRGT_TABLE', cast('2025-07-01 00:00:00' as date));
successfully created
-- Считаем строки
2025.10.09 12:31:10 Source rows counting
2025.10.09 12:31:10 [l_count_src_stmt] select count(*) from src_ora_schema.v_m_SRC_TABLE where valid_from >= cast('2025-07-01 00:00:00' as date) and valid_from < cast('2025-07-01 00:00:00' as date) + interval '1 month';
-- Очищаем данные, если они были ранее
2025.10.09 12:31:10 Data clearance
2025.10.09 12:31:10 [l_delete_stmt] delete from TRGT_SCHM.TRGT_TABLE where valid_from >= cast('2025-07-01 00:00:00' as date) and valid_from < cast('2025-07-01 00:00:00' as date) + interval '1 month';
-- Вставляем данные
2025.10.09 12:31:10 Insert data
2025.10.09 12:31:10 insert into TRGT_SCHM.TRGT_TABLE (valid_from, sw_id, tgno, opmode, connected_lines, semi_blocked_lines, trans_blocked_lines, ccu_sum_ofl_loss, cc_i, cc_o, ccs_with_answer_i, ccs_with_answer_o, tv_i, tv_o, avialable, file_id, status) select cast(valid_from as timestamp), cast(sw_id as int8), cast(tgno as varchar), cast(opmode as bpchar), cast(connected_lines as int8), cast(semi_blocked_lines as int8), cast(trans_blocked_lines as int8), cast(ccu_sum_ofl_loss as int8), cast(cc_i as int8), cast(cc_o as int8), cast(ccs_with_answer_i as int8), cast(ccs_with_answer_o as int8), cast(tv_i as int8), cast(tv_o as int8), cast(avialable as int8), cast(file_id as int8), cast(status as int8)
from src_ora_schema.v_m_SRC_TABLE where valid_from >= cast('2025-07-01 00:00:00' as date) and valid_from < cast('2025-07-01 00:00:00' as date) + interval '1 month';
-- Снова считаем строки -после вставки
2025.10.09 12:31:12 Row counting...
2025.10.09 12:31:12 select count(*) from TRGT_SCHM.TRGT_TABLE where valid_from >= cast('2025-07-01 00:00:00' as date) and valid_from < cast('2025-07-01 00:00:00' as date) + interval '1 month';
2025.10.09 12:31:12 Rows counted: 117368
-- Пишем итоговую запись в протокол - по синхронизированной секции
update repl_log.repl_log_data
set status = 'DONE',
d_load_end = '2025-10-09 12:31:12.855629+03'::timestamp,
rec_count = 117368::int8,
mod_count = 0::int8,
del_count = 0::int8,
max_key_value = null::int8,
repl_type = 'PARTITION_MONTH'::repl_meta.t_repl_type,
partition_key_value = '2025-07-01 00:00:00'
where id = 13280
2025.10.09 12:31:12 [end] repl_meta.r_sync_partition_single_value('999', '88', 'PARTITION_MONTH', '13280', 'delete from TRGT_SCHM.TRGT_TABLE where valid_from >= cast(''2025-07-01 00:00:00'' as date) and valid_from < cast(''2025-07-01 00:00:00'' as date) + interval ''1 month'';', 'insert into TRGT_SCHM.TRGT_TABLE (valid_from, sw_id, tgno, opmode, connected_lines, semi_blocked_lines, trans_blocked_lines, ccu_sum_ofl_loss, cc_i, cc_o, ccs_with_answer_i, ccs_with_answer_o, tv_i, tv_o, avialable, file_id, status) select cast(valid_from as timestamp), cast(sw_id as int8), cast(tgno as varchar), cast(opmode as bpchar), cast(connected_lines as int8), cast(semi_blocked_lines as int8), cast(trans_blocked_lines as int8), cast(ccu_sum_ofl_loss as int8), cast(cc_i as int8), cast(cc_o as int8), cast(ccs_with_answer_i as int8), cast(ccs_with_answer_o as int8), cast(tv_i as int8), cast(tv_o as int8), cast(avialable as int8), cast(file_id as int8), cast(status as int8)
from src_ora_schema.v_m_SRC_TABLE where valid_from >= cast(''2025-07-01 00:00:00'' as date) and valid_from < cast(''2025-07-01 00:00:00'' as date) + interval ''1 month'';', 'select count(*) from TRGT_SCHM.TRGT_TABLE where valid_from >= cast(''2025-07-01 00:00:00'' as date) and valid_from < cast(''2025-07-01 00:00:00'' as date) + interval ''1 month'';', 'select count(*) from src_ora_schema.v_m_SRC_TABLE where valid_from >= cast(''2025-07-01 00:00:00'' as date) and valid_from < cast(''2025-07-01 00:00:00'' as date) + interval ''1 month'';', '')
.....
--И пишем итоговую запись в протокол - по всему процессу синхронизации
update repl_log.repl_log_data
set status = 'DONE',
d_load_end = '2025-10-09 12:31:33.283642+03'::timestamp,
rec_count = 971312::int8,
mod_count = 0::int8,
del_count = 0::int8,
max_key_value = null::int8,
repl_type = 'PARTITION_MONTH'::repl_meta.t_repl_type,
partition_key_value = 'null'
where id = 13278
Как мы видим - действий производится очень много. При этом анализируется протокол синхронизации, выполненный ранее и успешно смигрированные порции (секции) не будут мигрироваться, если разница в количестве строк не изменилась.
Небольшая статистика
Фактически, если удается (а обычно удается) разделить миграцию данных на некоторое количество "стратегий" (полностью, кусочками, по датам, по идентификаторам), то у вас получится несколько наборов операторов (statement_sets), которые вы будете использовать для всех сущностей.
Для справки - на текущей системе у нас около двух сотен таблиц, примерно 2/3 их них секционированные и с большим объемом данных.
Таблица statements_sets
в итоге содержит 10 записей ("ширина" таблицы = 16 полей).
Таким образом получилось весьма компактно и сравнительно несложно в настройке.
Въедливый читатель спросит - как же так?! а если данные менялись, вы же не можете быть уверены?! Не можем, но опять таки, повторюсь - знайте ваши данные. Возможны варианты:
Данные записываются в секции и никогда не изменяются (наш случай как раз такой - мы ежедневно собираем конфигурацию с оборудования и уже не меняем ее (следующая порция идет новым массивом данных, формируя "новый срез")
Данные могут меняться, но редко и есть понятие "закрытого" / "открытого" периода - можно ориентироваться на эти признаки
Другие варианты
Миграция обычной таблицы
А тут все проще, но давайте тоже посмотрим:
select * from repl_meta.r_sync('999', 'schm_grp', 'TRGT_SCHM.TRGT_TABLE', NULL);
Результат:
Результат
-- Запустились
2025.10.13 16:50:06 [begin] repl_meta.r_sync('999', 'schm_grp', 'TRGT_SCHM.TRGT_TABLE', NULL)
-- Подготовим оператор delete
before substitution: delete from {dst_entity};
after substitution: delete from TRGT_SCHM.TRGT_TABLE;
-- Подготовим оператор insert
before substitution: {sw_insert_v_m}
custom statement used: insert into {dst_entity} ({field_list}) select {field_list_casted}
from src_ora_schema.v_m_{entity_short};
after substitution: insert into TRGT_SCHM.TRGT_TABLE (id, name, remm) select cast(id as varchar), cast(name as varchar), cast(remm as varchar)
from src_ora_schema.v_m_SRC_TABLE;
-- Подготовим операторы подсчета строк
before substitution: select count(*) from {dst_entity};
after substitution: select count(*) from TRGT_SCHM.TRGT_TABLE;
before substitution: {sw_cnt_src_v_m}
custom statement used: select count(*) from src_ora_schema.v_m_{entity_short};
after substitution: select count(*) from src_ora_schema.v_m_SRC_TABLE;
-- Вызов функции полной синхронизации таблицы
2025.10.13 16:50:07 [begin] repl_meta.r_full_sync('999', '125', 'FULL', '16083', 'delete from TRGT_SCHM.TRGT_TABLE;', 'insert into TRGT_SCHM.TRGT_TABLE (id, name, remm) select cast(id as varchar), cast(name as varchar), cast(remm as varchar)
from src_ora_schema.v_m_SRC_TABLE;', 'select count(*) from TRGT_SCHM.TRGT_TABLE;', 'select count(*) from src_ora_schema.v_m_SRC_TABLE;', '')
2025.10.13 16:50:07 Source rows counting
2025.10.13 16:50:07 [l_count_src_stmt] select count(*) from src_ora_schema.v_m_SRC_TABLE;
2025.10.13 16:50:08 Data clearance
2025.10.13 16:50:08 [l_delete_stmt] delete from TRGT_SCHM.TRGT_TABLE;
2025.10.13 16:50:08 Insert data
2025.10.13 16:50:08 insert into TRGT_SCHM.TRGT_TABLE (id, name, remm) select cast(id as varchar), cast(name as varchar), cast(remm as varchar)
from src_ora_schema.v_m_SRC_TABLE;
-- Проверка числа строк
2025.10.13 16:50:08 Row counting...
2025.10.13 16:50:08 select count(*) from TRGT_SCHM.TRGT_TABLE;
2025.10.13 16:50:08 Rows counted: 10
-- Финализация записи в протоколе
update repl_log.repl_log_data
set status = 'DONE',
d_load_end = '2025-10-13 16:50:08.556928+03'::timestamp,
rec_count = 10::int8,
mod_count = 0::int8,
del_count = 0::int8,
max_key_value = null::int8,
repl_type = 'FULL'::repl_meta.t_repl_type,
partition_key_value = 'null'
where id = 16083
2025.10.13 16:50:08 [end] repl_meta.r_full_sync('999', '125', 'FULL', '16083', 'delete from TRGT_SCHM.TRGT_TABLE;', 'insert into TRGT_SCHM.TRGT_TABLE (id, name, remm) select cast(id as varchar), cast(name as varchar), cast(remm as varchar)
from src_ora_schema.v_m_SRC_TABLE;', 'select count(*) from TRGT_SCHM.TRGT_TABLE;', 'select count(*) from src_ora_schema.v_m_SRC_TABLE;', '')
Дополнительная функциональность, которая, однако также важна
Здесь обсудим дополнительные шаги, без которых миграция будет неполной, выше (в процедуре полного цикла) мы уже немного касались их
Отключение / включение внешних ключей
Идея здесь очень проста - в таблице repl_settings.repl_fk
мы ведем список внешних ключей в виде оператров:
ALTER TABLE sss.ttt ADD CONSTRAINT ccc_fk01 FOREIGN KEY (fff) REFERENCES ssss.tttt(ffff);
Включение ключей - просто выполнение этих операций друг задругом. Для "выключения" - меняем через регулярные выражения ADD CONSTRAINT
на DROP CONSTRAINT
Синхронизация последовательностей
Последовательности (сиквенсов) - очень важны, так как ключи в таблицах должны "подрастать" с правильных значений.
Идея не менее проста - с обоих сторон (Oracle и Postgres) мы можем получить (через словарь данных - information_schema.sequences
) значения сиквенсов, нам останется только сгенерировать операторы установки зачений. За это у нас отвечает функция repl_meta.r_set_seq_values()
.
Создание секций
Наверное все это делали так или иначе, но на всякий случай опубликую - мы это делаем
примерно так:
CREATE OR REPLACE FUNCTION partn.f_add_partition_month(p_schema_name text, p_table_name text, p_month date)
RETURNS boolean
LANGUAGE plpgsql
AS $function$
declare
l_cnt integer;
l_partn_table_name constant text :=
lower (
format (
'%1$s__%2$s_MONTH_%3$s',
p_schema_name,
p_table_name,
to_char(p_month, 'YYYYMM')
)
);
l_sql_create constant text :=
format (
'create table partn.%1$s partition of %2$s.%3$s for values from (%4$L) to (%5$L)',
l_partn_table_name,
p_schema_name,
p_table_name,
to_char(p_month, 'YYYY-MM-01'),
to_char(to_char(p_month, 'YYYY-MM-01')::date + interval '1 month', 'YYYY-MM-01')
);
begin
select count(1)
into l_cnt
from information_schema.tables t
where table_schema = 'partn'
and table_name = l_partn_table_name;
if l_cnt = 1 then
raise notice '%, %', l_partn_table_name, 'already exists';
end if;
if l_cnt = 0 then
begin
execute l_sql_create;
-- Это очень важно! так как выполнение идет из нескольких потоков и если 2 потока запросили создание партиции,
-- то создаст ее только 1 поток, а второй должен обработать "уже есть партиция"
raise notice '%', 'successfully created';
exception when sqlstate '42P07' then
null;
end;
end if;
return true;
end;
$function$
;
COMMENT ON FUNCTION partn.f_add_partition_month(text, text, date) IS 'Создание партиций под загрузки по дате - размером в 1 месяц';
История последней миграции
Настало время вспомнить мысль из предыдущих глав:
... нужна возможность настройки процесса миграции
Третья мысль - мне нужна возможность настройки процесса миграции (а не полные копии таблиц), которую я назвал бы правилами миграции. Чтобы:
Подвигать таблицы между схемами (т.е. разное название приемника и потребителя)
Где-то преобразовать типы данных, иногда не автоматически, а по своей логике
И подсветить несколько интересных моментов.
Вы думаете тема - хранить значения json в столбцах таблицы БД - свежая, модная и молодежная? Как бы не так! Отцы-основатели не парились и 20 лет назад - делая такое:
Поле "массив строк":
Описывается через тип:
create or replace TYPE MY_ARRAY_TYPE AS VARRAY(20) OF VARCHAR2(20);
Поле-массив KV-значений
Описывается так:
create or replace TYPE MY_KV_TYPE AS OBJECT (P_NAME VARCHAR2(10), P_VALUE VARCHAR2(40));
А затем так:
create or replace TYPE MY_KV_ARRAY AS VARRAY(20) OF MY_KV_TYPE;
Мы подумали и приняли командное решение - ну сейчас-то пришло время и то и другое хранить в json! А как мигрировать будем? Но у нас же есть мигратор!
И все получилось:
Создали представление на "базовую таблицу"
Настроили миграцию, чтобы она забирала "из представления". Если присмотритесь в коде подстановок выше - там как раз видна приставка "
v_m_
" - это они и есть.Под капотом у представления примерное такое:
CREATE OR REPLACE VIEW SSS.VVV AS
select
f1, f2, f3, ...,
(select '{' || listagg(ww2.column_value, ',') within group (order by rownum) || '}' as my_array_json
from table(w.my_array_typed_field) ww2) as my_array_json,
(select '[' || listagg('{"p_name": "' || ww.p_name || '", "p_value": "' || replace(ww.p_value, '"', '\"') || '"}', ',') within group (order by rownum) || ']' as my_kv_json_array
from table(w.my_kv_type) ww) as my_kv_json_array
from base_table;
Вместо заключения
Сейчас мигратор - наш локальный внутренний инструмент, который постоянно находится в "полуразобранном состоянии", дорабатывается (зачастую существенно) под каждую новую систему. Несмотря на это - у него "за плечами" уже больше 3 систем и в целом - все получается.
Буду рад, если в комментариях немного подискутируем про подходы и инструменты, которые используете вы, критике.
Кому интересно попробовать пройти такой же дорогой - есть планы опубликовать код мигратора в open source, также приходите в комментарии.