
Необходимость переноса данных из одной среды в другую — задача, с которой разработчики сталкиваются достаточно часто. Например, для отправки таблиц из прода в среды для тестирования. Вместе с тем, такая «перезаливка» таблиц нередко превращается в настоящий квест, по ходу которого нужно не только гарантировать сохранность данных, но и исключить ошибки, связанные с человеческим фактором. Поэтому лучшей практикой является автоматизация переноса.
Меня зовут Евгений Грибков. Я ведущий программист в центре технологий VK. В этой статье мы рассмотрим одно из возможных решений создания скрипта перезаливки заданных таблиц из одной БД в другую на примере MS SQL.
Типовой алгоритм перезаливки таблиц в базе данных
Обычно в ситуациях, когда нужно перезалить данные в таблицах БД из одной среды в другую, применяют шаблонный алгоритм восстановления базы:
делают резервную копию с прода;
восстанавливают бэкап на отдельной изолированной, промежуточной среде;
производят обезличивание или изменение персональных данных;
создают бэкап с полученной базы данных;
восстанавливают резервную копию в нужные среды (например, для разработки и тестирования).
Обычно весь описанный алгоритм автоматизирован и работает по расписанию (например, раз в сутки ночью или раз в неделю на выходных).
Но у такого подхода есть существенный недостаток: каждый раз восстанавливать всю базу данных во все среды — весьма длительный и очень дорогой в плане занимаемого места процесс. Поэтому в алгоритме нередко предусматривают чистки как исторических данных до определенного момента времени (например, всё, что старее одного календарного года), так и по определённым критериям. Таким образом достигается уменьшение объема БД в десятки, а то и в сотни раз.
Также надо учитывать, что в средах может быть несколько БД, в которые надо периодически догружать данные. Причем желательно, чтобы была возможность делать это в моменты, когда БД не используется активно — например, раз в неделю ночью или каждую ночь.
Соответственно, в таких кейсах важна автоматизация, которая дает возможность перезаливать таблицы БД из одной среды в другую без ручного контроля и глобального вмешательства разработчиков.
Вариант реализации скрипта
Теперь перейдем от теории к практике. Рассмотрим один из вариантов создания скрипта перезаливки заданных таблиц из одной БД в другую на примере MS SQL. При этом сразу оговоримся и примем условие, что обе БД на одном экземпляре СУБД — то есть, БД‑источник уже восстановлен в той же СУБД, в которой находится целевая БД.
Алгоритм работы такого скрипта будет следующим:
Отключить все ограничения.
Отключить все триггеры.
Для заданных таблиц сохранить все внешние ключи, после чего удалить их.
Произвести полную очистку заданных таблиц через команду
TRUNCATEс последующим их заполнением данными.Удалить битые данные.
Обновить статистики перезалитых выше таблиц.
Включить триггеры.
Восстановить внешние ключи.
Включить и перепроверить все ограничения.
Теперь приступим к реализации.
Для начала определим все необходимые переменные и временные таблицы
SET NOCOUNT OFF; DECLARE @DB VARCHAR(250) = QUOTENAME(DB_NAME()), @DBMaster VARCHAR(255) = 'БД-мастер', @ERROR VARCHAR(MAX); DECLARE @HistoryLimited bit = 1, @table_name nvarchar(255), @is_identity int = 0, @stm nvarchar(max) = '', @cols nvarchar(max) = '', @IsNOTInsert bit, @schema_name nvarchar(255), @col_name_identity nvarchar(255), @referencing_object nvarchar(255), @referenced_object nvarchar(255), @constraint_name nvarchar(255), @referencing_columns nvarchar(max), @referenced_columns nvarchar(max), @rules nvarchar(max), @key_cols nvarchar(max), @StartMoment DATETIME2, @FinishMoment DATETIME2, @delete_referential_action INT, @update_referential_action INT, @max_row_insert INT = 100000, @isClearTableFKs BIT = 1, @RowCount BIGINT = 1, @WhileDelCount INT = 0; ; DECLARE @cnt TABLE (cnt BIGINT NOT NULL); DROP TABLE IF EXISTS #tbl_res; CREATE TABLE #tbl_res ( SchName NVARCHAR(255) NOT NULL, TblName NVARCHAR(255) NOT NULL, StartMoment DATETIME2 NOT NULL, FinishMoment DATETIME2 NOT NULL, Cnt BIGINT NOT NULL, ErrorMsg NVARCHAR(MAX) NULL );
Здесь определены переменные, которые будут использоваться далее в скрипте, а также табличная переменная, в которой будет записан итог работы перезаливки данных с таймингом.
Далее отключаем все ограничения в БД. Это можно сделать с помощью следующей команды: EXEC sys.sp_msforeachtable "ALTER TABLE ? NOCHECK CONSTRAINT ALL"
Отключаем все триггеры БД
DECLARE r_cursor_trigg_off CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] DISABLE TRIGGER [%s];' + CHAR(13) , SCHEMA_NAME(b.[schema_id]) , OBJECT_NAME(t.parent_id) , t.[Name]) AS stm FROM sys.triggers t LEFT JOIN sys.tables b ON b.object_id = t.parent_id WHERE t.is_disabled = 0 AND t.type_desc = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NOT NULL ORDER BY SCHEMA_NAME(b.[schema_id]) ASC, OBJECT_NAME(t.parent_id) ASC; OPEN r_cursor_trigg_off; FETCH NEXT FROM r_cursor_trigg_off INTO @stm; WHILE @@FETCH_STATUS = 0 BEGIN PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_trigg_off INTO @stm; END CLOSE r_cursor_trigg_off; DEALLOCATE r_cursor_trigg_off; SET @stm = ''; SELECT @stm += FORMATMESSAGE('DISABLE TRIGGER [%s] ON DATABASE;' + CHAR(13), t.[Name]) FROM sys.triggers t LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id WHERE t.is_disabled = 0 AND t.[type_desc] = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NULL; PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH
Далее собираем метаданные по таблицам, с которыми будем работать
DROP TABLE IF EXISTS #tbls; CREATE TABLE #tbls ( [name] NVARCHAR(255) NOT NULL, sch_name NVARCHAR(255) NOT NULL, IsNOTInsert BIT NOT NULL ); INSERT INTO #tbls ( [name], sch_name, IsNOTInsert ) SELECT t.[name], SCHEMA_NAME(t.[schema_id]) AS sch_name, --задается правило, по которому определяем --нужно ли после очистки наполнять данными таблицу или нет --по умолчанию нужно (0-да, 1-нет) 0 AS IsNOTInsert FROM sys.tables AS t --в фильтре задаем какие таблицы брать в расчет --(в нашем случае какие не брать в расчёт) WHERE t.[name] NOT LIKE 'unused%' AND t.[name] NOT LIKE 'removed%' AND t.[name] NOT LIKE 'migrated%' AND t.[name] NOT LIKE 'migration%' AND t.[name] NOT LIKE 'sysdiag%' AND t.[name] NOT LIKE 'test%' AND t.[name] NOT LIKE 'tmp%' AND t.[name] NOT LIKE '%_cache' AND t.[name] NOT IN ('FKs');
Теперь соберем все внешние ключи полученных таблиц, сохраним их в таблице dbo.FKs, затем — удалим
IF NOT EXISTS (SELECT 1 FROM sys.tables AS t WHERE t.[name]= 'FKs' AND t.[schema_id] = SCHEMA_ID('dbo')) BEGIN CREATE TABLE dbo.FKs ( referencing_object NVARCHAR(255) NOT NULL, constraint_column_id INT NOT NULL, referencing_column_name NVARCHAR(255) NOT NULL, referenced_object NVARCHAR(255) NOT NULL, referenced_column_name NVARCHAR(255) NOT NULL, constraint_name NVARCHAR(255) NOT NULL, delete_referential_action INT NOT NULL, update_referential_action INT NOT NULL ); END ELSE IF (@isClearTableFKs = 1) BEGIN TRUNCATE TABLE dbo.FKs; END INSERT INTO dbo.FKs ( referencing_object, constraint_column_id, referencing_column_name, referenced_object, referenced_column_name, constraint_name, delete_referential_action, update_referential_action ) SELECT CONCAT('[', SCHEMA_NAME(P.[schema_id]), '].[' , OBJECT_NAME(FK.parent_object_id), ']') AS referencing_object, FK.constraint_column_id, CONCAT('[' , COL_NAME(FK.parent_object_id, FK.parent_column_id) , ']') AS referencing_column_name, CONCAT('[' , SCHEMA_NAME(R.[schema_id]), '].[' , OBJECT_NAME(FK.referenced_object_id) , ']') AS referenced_object, CONCAT('[' , COL_NAME(FK.referenced_object_id, FK.referenced_column_id) , ']') AS referenced_column_name, CONCAT('[' , OBJECT_NAME(FK.constraint_object_id) , ']') AS constraint_name, FKK.delete_referential_action, FKK.update_referential_action FROM sys.foreign_key_columns AS FK INNER JOIN sys.foreign_keys AS FKK ON FKK.[object_id] = FK.constraint_object_id INNER JOIN sys.tables AS P ON P.[object_id] = FK.parent_object_id INNER JOIN sys.tables AS R ON R.[object_id] = FK.referenced_object_id WHERE NOT EXISTS (SELECT 1 FROM dbo.FKs AS t0 WHERE t0.constraint_name = CONCAT('[' , OBJECT_NAME(FK.constraint_object_id), ']')); DELETE FROM trg FROM dbo.FKs AS trg WHERE NOT EXISTS ( SELECT 1 FROM #tbls AS src WHERE trg.referencing_object = CONCAT('[' , src.sch_name, '].[', src.[name], ']') OR trg.referenced_object = CONCAT('[' , src.sch_name, '].[', src.[name], ']') ) DECLARE r_cursor_fk_drop CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object, t.referenced_object, t.constraint_name FROM dbo.FKs AS t WHERE EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK WHERE t.constraint_name = CONCAT('[' , OBJECT_NAME(FK.constraint_object_id) , ']')) GROUP BY t.referencing_object, t.referenced_object, t.constraint_name; OPEN r_cursor_fk_drop; FETCH NEXT FROM r_cursor_fk_drop INTO @referencing_object, @referenced_object, @constraint_name WHILE @@FETCH_STATUS = 0 BEGIN SET @stm = CONCAT('ALTER TABLE ', @referencing_object , ' DROP CONSTRAINT ', @constraint_name, ';'); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_fk_drop INTO @referencing_object, @referenced_object, @constraint_name; END CLOSE r_cursor_fk_drop; DEALLOCATE r_cursor_fk_drop;
Следом перейдем к фрагменту кода, который отвечает за очистку и наполнение данными выбранных ранее таблиц
DECLARE r_cursor CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.[name], t.sch_name, t.IsNOTInsert FROM #tbls AS t ORDER BY t.[name] ASC; OPEN r_cursor; FETCH NEXT FROM r_cursor INTO @table_name, @schema_name, @IsNOTInsert; WHILE @@FETCH_STATUS = 0 BEGIN SET @cols = ''; SET @is_identity = 0; SET @col_name_identity = NULL; SET @stm = CONCAT('TRUNCATE TABLE ', @DB , '.[', @schema_name, '].[', @table_name, ']'); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH IF (@IsNOTInsert = 0) BEGIN SELECT @cols = @cols + CASE WHEN @cols = '' THEN c.[name] ELSE ',' + c.name END, @is_identity = @is_identity + c.is_identity, @col_name_identity = CASE WHEN (c.is_identity = 1) THEN c.[name] ELSE @col_name_identity END FROM sys.tables t, sys.columns c WHERE t.[object_id] = c.[object_id] AND t.[name] = @table_name AND c.is_computed = 0; SET @stm = ''; IF @is_identity > 0 SET @stm = CONCAT('SET IDENTITY_INSERT ' , @DB, '.[', @schema_name, '].[', @table_name, '] ON'); SET @stm = CONCAT(@stm, ' INSERT INTO ', @DB , '.[', @schema_name, '].[', @table_name , '](', @cols, ') SELECT ', @cols , ' FROM [',@DBMaster,'].[' , @schema_name, '].[' , @table_name, '] WITH(NOLOCK)'); --здесь можно задать ограничение на наполнение данными IF @HistoryLimited = 1 BEGIN IF @table_name LIKE '%History' SET @stm = CONCAT(@stm , ' WHERE ChangeDateTime > DATEADD (month, -1, SYSDATETIME()) '); END SET @stm = CONCAT(@stm, ' OPTION(RECOMPILE)'); IF @is_identity > 0 SET @stm = CONCAT(@stm , ' SET IDENTITY_INSERT ', @DB , '.[', @schema_name, '].[', @table_name, '] OFF'); IF @is_identity > 0 SET @stm = CONCAT(@stm , ' DBCC CHECKIDENT ("', @table_name, '")'); SET @StartMoment = SYSDATETIME(); SET @ERROR = NULL; PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH SET @FinishMoment = SYSDATETIME(); SET @stm = CONCAT('SELECT COUNT_BIG(*) FROM ' , '[', @schema_name, '].[', @table_name, '] WITH (NOLOCK);'); DELETE FROM @cnt; INSERT INTO @cnt (cnt) EXEC sys.sp_executesql @stmt = @stm; INSERT INTO #tbl_res ( SchName, TblName, StartMoment, FinishMoment, Cnt, ErrorMsg ) SELECT @schema_name, @table_name, @StartMoment, @FinishMoment, COALESCE((SELECT SUM(cnt) FROM @cnt), 0) AS Cnt, @ERROR; END FETCH NEXT FROM r_cursor INTO @table_name, @schema_name, @IsNOTInsert; END CLOSE r_cursor; DEALLOCATE r_cursor;
Обычно после этого фрагмента производятся какие‑то еще необходимые манипуляции с данными. Например, добавляются нужные пользователи и роли с правами в соответствующие таблицы БД.
Затем производится удаление битых данных, то есть тех, по которым нет для записи из одной таблицы соответствующей записи в другой таблице по внешнему ключу
WHILE (@RowCount > 0) BEGIN SET @RowCount = 0; SET @WhileDelCount += 1; DECLARE r_cursor_fk_corr CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object, t.referenced_object, t.constraint_name, STRING_AGG (CONCAT('(trg.', t.referencing_column_name , '=src.', t.referenced_column_name, ')'), ' AND ') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS rules, STRING_AGG (CONCAT('(trg.', t.referencing_column_name , ' IS NOT NULL)'), ' AND ') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS key_cols FROM dbo.FKs AS t GROUP BY t.referencing_object, t.referenced_object, t.constraint_name; OPEN r_cursor_fk_corr; FETCH NEXT FROM r_cursor_fk_corr INTO @referencing_object , @referenced_object , @constraint_name , @rules , @key_cols; WHILE @@FETCH_STATUS = 0 BEGIN SET @stm = CONCAT('DELETE FROM trg FROM ', @referencing_object ,' AS trg WHERE ', @key_cols , ' AND NOT EXISTS (SELECT 1 FROM ', @referenced_object, ' AS src WITH (NOLOCK) WHERE ', @rules, ');'); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; SET @RowCount += @@ROWCOUNT; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_fk_corr INTO @referencing_object , @referenced_object , @constraint_name , @rules , @key_cols; END CLOSE r_cursor_fk_corr; DEALLOCATE r_cursor_fk_corr; END PRINT CONCAT('WHILE DELETE COUNT: ', @WhileDelCount);
Удаление неконсистентных данных происходит до тех пор, пока такие данные обнаруживаются. Это нужно, чтобы исключить ситуацию, когда была удалена запись из одной таблицы, но при этом была потеряна соответствующая связь в другой уже ранее обработанной таблице.
Далее обновляем статистики для рассматриваемых выше таблиц
DECLARE r_cursor_stat CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT CONCAT('UPDATE STATISTICS ', @DB, '.[' , t.sch_name, '].[', t.[name], '] WITH FULLSCAN;') AS stm FROM #tbls AS t; OPEN r_cursor_stat; FETCH NEXT FROM r_cursor_stat INTO @stm; WHILE @@FETCH_STATUS = 0 BEGIN PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_stat INTO @stm END CLOSE r_cursor_stat; DEALLOCATE r_cursor_stat;
Теперь необходимо включить триггеры в БД
DECLARE r_cursor_trigg_on CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] ENABLE TRIGGER [%s];' + CHAR(13), SCHEMA_NAME(b.[schema_id]) , OBJECT_NAME(t.parent_id), t.[Name]) AS stm FROM sys.triggers t LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id WHERE t.is_disabled = 1 AND t.[type_desc] = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NOT NULL OPEN r_cursor_trigg_on; FETCH NEXT FROM r_cursor_trigg_on INTO @stm; WHILE @@FETCH_STATUS = 0 BEGIN PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_trigg_on INTO @stm; END CLOSE r_cursor_trigg_on; DEALLOCATE r_cursor_trigg_on; SET @stm = ''; SELECT @stm += FORMATMESSAGE('ENABLE TRIGGER [%s] ON DATABASE;' + CHAR(13), t.[Name]) FROM sys.triggers t WHERE t.is_disabled = 0 AND t.[type_desc] = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NULL; PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH
После этого восстанавливаем все ранее удаленные внешние ключи
DECLARE r_cursor_fk_recover CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object, t.referenced_object, t.constraint_name, STRING_AGG (t.referencing_column_name, ',') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS referencing_columns, STRING_AGG (t.referenced_column_name, ',') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS referenced_columns, t.delete_referential_action, t.update_referential_action FROM dbo.FKs AS t WHERE NOT EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK WHERE t.constraint_name = CONCAT('[', OBJECT_NAME(FK.constraint_object_id) , ']')) GROUP BY t.referencing_object, t.referenced_object, t.constraint_name, t.delete_referential_action, t.update_referential_action; OPEN r_cursor_fk_recover; FETCH NEXT FROM r_cursor_fk_recover INTO @referencing_object , @referenced_object , @constraint_name , @referencing_columns , @referenced_columns , @delete_referential_action , @update_referential_action; WHILE @@FETCH_STATUS = 0 BEGIN SET @stm = CONCAT('ALTER TABLE ', @referencing_object ,' WITH CHECK ADD CONSTRAINT ', @constraint_name, ' FOREIGN KEY(', @referencing_columns, ') REFERENCES ' , @referenced_object, ' (', @referenced_columns, ') ' , CASE WHEN @delete_referential_action = 1 THEN 'ON DELETE CASCADE ' WHEN @delete_referential_action = 2 THEN 'ON DELETE SET NULL ' ELSE '' END , CASE WHEN @update_referential_action = 1 THEN 'ON UPDATE CASCADE ' WHEN @update_referential_action = 2 THEN 'ON UPDATE SET NULL ' ELSE '' END , '; ' , 'ALTER TABLE ', @referencing_object, ' CHECK CONSTRAINT ' , @constraint_name, '; '); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_fk_recover INTO @referencing_object , @referenced_object , @constraint_name , @referencing_columns , @referenced_columns , @delete_referential_action , @update_referential_action; END CLOSE r_cursor_fk_recover; DEALLOCATE r_cursor_fk_recover;
В конце запускаем проверку всех ограничений, используя следующую команду:
EXEC sys.sp_msforeachtable @commAND1="PRINT '?'", @commAND2="ALTER TABLE ? WITH CHECK CHECK CONSTRAINT ALL";
После делаем вывод статистики работы перезаливки данных в таблицы:
SELECT t.SchName, t.TblName, t.Cnt, DATEDIFF(millisecond, t.StartMoment, t.FinishMoment) AS DiffMSec, t.ErrorMsg FROM #tbl_res AS t ORDER BY t.SchName ASC, t.TblName ASC;
В итоге получаем полный скрипт
SET NOCOUNT OFF; DECLARE @DB VARCHAR(250) = QUOTENAME(DB_NAME()), @DBMaster VARCHAR(255) = 'БД-мастер', @ERROR VARCHAR(MAX); DECLARE @HistoryLimited bit = 1, @table_name nvarchar(255), @is_identity int = 0, @stm nvarchar(max) = '', @cols nvarchar(max) = '', @IsNOTInsert bit, @schema_name nvarchar(255), @col_name_identity nvarchar(255), @referencing_object nvarchar(255), @referenced_object nvarchar(255), @constraint_name nvarchar(255), @referencing_columns nvarchar(max), @referenced_columns nvarchar(max), @rules nvarchar(max), @key_cols nvarchar(max), @StartMoment DATETIME2, @FinishMoment DATETIME2, @delete_referential_action INT, @update_referential_action INT, @max_row_insert INT = 100000, @isClearTableFKs BIT = 1, @RowCount BIGINT = 1, @WhileDelCount INT = 0; ; DECLARE @cnt TABLE (cnt BIGINT NOT NULL); DROP TABLE IF EXISTS #tbl_res; CREATE TABLE #tbl_res ( SchName NVARCHAR(255) NOT NULL, TblName NVARCHAR(255) NOT NULL, StartMoment DATETIME2 NOT NULL, FinishMoment DATETIME2 NOT NULL, Cnt BIGINT NOT NULL, ErrorMsg NVARCHAR(MAX) NULL ); EXEC sys.sp_msforeachtable "ALTER TABLE ? NOCHECK CONSTRAINT ALL"; DECLARE r_cursor_trigg_off CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] DISABLE TRIGGER [%s];' + CHAR(13), SCHEMA_NAME(b.[schema_id]), OBJECT_NAME(t.parent_id) , t.[Name]) AS stm FROM sys.triggers t LEFT JOIN sys.tables b ON b.object_id = t.parent_id WHERE t.is_disabled = 0 AND t.type_desc = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NOT NULL ORDER BY SCHEMA_NAME(b.[schema_id]) ASC, OBJECT_NAME(t.parent_id) ASC; OPEN r_cursor_trigg_off; FETCH NEXT FROM r_cursor_trigg_off INTO @stm; WHILE @@FETCH_STATUS = 0 BEGIN PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_trigg_off INTO @stm; END CLOSE r_cursor_trigg_off; DEALLOCATE r_cursor_trigg_off; SET @stm = ''; SELECT @stm += FORMATMESSAGE('DISABLE TRIGGER [%s] ON DATABASE;' + CHAR(13), t.[Name]) FROM sys.triggers t LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id WHERE t.is_disabled = 0 AND t.[type_desc] = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NULL; PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH DROP TABLE IF EXISTS #tbls; CREATE TABLE #tbls ( [name] NVARCHAR(255) NOT NULL, sch_name NVARCHAR(255) NOT NULL, IsNOTInsert BIT NOT NULL ); INSERT INTO #tbls ( [name], sch_name, IsNOTInsert ) SELECT t.[name], SCHEMA_NAME(t.[schema_id]) AS sch_name, --задаётся правило, по которому определяем --нужно ли после очистки наполнять данными таблицу или нет --по умолчанию нужно (0-да, 1-нет) 0 AS IsNOTInsert FROM sys.tables AS t --в фильтре задаем какие таблицы брать в расчет --(в нашем случае какие не брать в расчет) WHERE t.[name] NOT LIKE 'unused%' AND t.[name] NOT LIKE 'removed%' AND t.[name] NOT LIKE 'migrated%' AND t.[name] NOT LIKE 'migration%' AND t.[name] NOT LIKE 'sysdiag%' AND t.[name] NOT LIKE 'test%' AND t.[name] NOT LIKE 'tmp%' AND t.[name] NOT LIKE '%_cache' AND t.[name] NOT IN ('FKs'); IF NOT EXISTS (SELECT 1 FROM sys.tables AS t WHERE t.[name]= 'FKs' AND t.[schema_id] = SCHEMA_ID('dbo')) BEGIN CREATE TABLE dbo.FKs ( referencing_object NVARCHAR(255) NOT NULL, constraint_column_id INT NOT NULL, referencing_column_name NVARCHAR(255) NOT NULL, referenced_object NVARCHAR(255) NOT NULL, referenced_column_name NVARCHAR(255) NOT NULL, constraint_name NVARCHAR(255) NOT NULL, delete_referential_action INT NOT NULL, update_referential_action INT NOT NULL ); END ELSE IF (@isClearTableFKs = 1) BEGIN TRUNCATE TABLE dbo.FKs; END INSERT INTO dbo.FKs ( referencing_object, constraint_column_id, referencing_column_name, referenced_object, referenced_column_name, constraint_name, delete_referential_action, update_referential_action ) SELECT CONCAT('[', SCHEMA_NAME(P.[schema_id]), '].[' , OBJECT_NAME(FK.parent_object_id), ']') AS referencing_object, FK.constraint_column_id, CONCAT('[', COL_NAME(FK.parent_object_id , FK.parent_column_id), ']') AS referencing_column_name, CONCAT('[', SCHEMA_NAME(R.[schema_id]), '].[' , OBJECT_NAME(FK.referenced_object_id), ']') AS referenced_object, CONCAT('[', COL_NAME(FK.referenced_object_id , FK.referenced_column_id), ']') AS referenced_column_name, CONCAT('[', OBJECT_NAME(FK.constraint_object_id) , ']') AS constraint_name, FKK.delete_referential_action, FKK.update_referential_action FROM sys.foreign_key_columns AS FK INNER JOIN sys.foreign_keys AS FKK ON FKK.[object_id] = FK.constraint_object_id INNER JOIN sys.tables AS P ON P.[object_id] = FK.parent_object_id INNER JOIN sys.tables AS R ON R.[object_id] = FK.referenced_object_id WHERE NOT EXISTS (SELECT 1 FROM dbo.FKs AS t0 WHERE t0.constraint_name = CONCAT('[' , OBJECT_NAME(FK.constraint_object_id), ']')); DELETE FROM trg FROM dbo.FKs AS trg WHERE NOT EXISTS ( SELECT 1 FROM #tbls AS src WHERE trg.referencing_object = CONCAT('[', src.sch_name , '].[', src.[name], ']') OR trg.referenced_object = CONCAT('[', src.sch_name , '].[', src.[name], ']') ) DECLARE r_cursor_fk_drop CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object, t.referenced_object, t.constraint_name FROM dbo.FKs AS t WHERE EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK WHERE t.constraint_name = CONCAT('[' , OBJECT_NAME(FK.constraint_object_id), ']')) GROUP BY t.referencing_object, t.referenced_object, t.constraint_name; OPEN r_cursor_fk_drop; FETCH NEXT FROM r_cursor_fk_drop INTO @referencing_object, @referenced_object, @constraint_name WHILE @@FETCH_STATUS = 0 BEGIN SET @stm = CONCAT('ALTER TABLE ', @referencing_object , ' DROP CONSTRAINT ', @constraint_name, ';'); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_fk_drop INTO @referencing_object, @referenced_object, @constraint_name; END CLOSE r_cursor_fk_drop; DEALLOCATE r_cursor_fk_drop; DECLARE r_cursor CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.[name], t.sch_name, t.IsNOTInsert FROM #tbls AS t ORDER BY t.[name] ASC; OPEN r_cursor; FETCH NEXT FROM r_cursor INTO @table_name, @schema_name, @IsNOTInsert; WHILE @@FETCH_STATUS = 0 BEGIN SET @cols = ''; SET @is_identity = 0; SET @col_name_identity = NULL; SET @stm = CONCAT('TRUNCATE TABLE ', @DB , '.[', @schema_name, '].[', @table_name, ']'); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH IF (@IsNOTInsert = 0) BEGIN SELECT @cols = @cols + CASE WHEN @cols = '' THEN c.[name] ELSE ',' + c.name END, @is_identity = @is_identity + c.is_identity, @col_name_identity = CASE WHEN (c.is_identity = 1) THEN c.[name] ELSE @col_name_identity END FROM sys.tables t, sys.columns c WHERE t.[object_id] = c.[object_id] AND t.[name] = @table_name AND c.is_computed = 0; SET @stm = ''; IF @is_identity > 0 SET @stm = CONCAT('SET IDENTITY_INSERT ' , @DB, '.[', @schema_name, '].[', @table_name, '] ON'); SET @stm = CONCAT(@stm, ' INSERT INTO ', @DB, '.[' , @schema_name, '].[', @table_name , '](', @cols, ') SELECT ', @cols , ' FROM [',@DBMaster,'].[' , @schema_name, '].[' , @table_name, '] WITH(NOLOCK)'); --здесь можно задать ограничение на наполнение данными IF @HistoryLimited = 1 BEGIN IF @table_name LIKE '%History' SET @stm = CONCAT(@stm , ' WHERE ChangeDateTime > DATEADD (month, -1, SYSDATETIME()) '); END SET @stm = CONCAT(@stm, ' OPTION(RECOMPILE)'); IF @is_identity > 0 SET @stm = CONCAT(@stm, ' SET IDENTITY_INSERT ' , @DB, '.[', @schema_name, '].[', @table_name, '] OFF'); IF @is_identity > 0 SET @stm = CONCAT(@stm, ' DBCC CHECKIDENT ("' , @table_name, '")'); SET @StartMoment = SYSDATETIME(); SET @ERROR = NULL; PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH SET @FinishMoment = SYSDATETIME(); SET @stm = CONCAT('SELECT COUNT_BIG(*) FROM ', '[', @schema_name , '].[', @table_name, '] WITH (NOLOCK);'); DELETE FROM @cnt; INSERT INTO @cnt (cnt) EXEC sys.sp_executesql @stmt = @stm; INSERT INTO #tbl_res ( SchName, TblName, StartMoment, FinishMoment, Cnt, ErrorMsg ) SELECT @schema_name, @table_name, @StartMoment, @FinishMoment, COALESCE((SELECT SUM(cnt) FROM @cnt), 0) AS Cnt, @ERROR; END FETCH NEXT FROM r_cursor INTO @table_name, @schema_name, @IsNOTInsert; END CLOSE r_cursor; DEALLOCATE r_cursor; WHILE (@RowCount > 0) BEGIN SET @RowCount = 0; SET @WhileDelCount += 1; DECLARE r_cursor_fk_corr CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object, t.referenced_object, t.constraint_name, STRING_AGG (CONCAT('(trg.', t.referencing_column_name , '=src.', t.referenced_column_name, ')'), ' AND ') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS rules, STRING_AGG (CONCAT('(trg.', t.referencing_column_name , ' IS NOT NULL)'), ' AND ') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS key_cols FROM dbo.FKs AS t GROUP BY t.referencing_object, t.referenced_object, t.constraint_name; OPEN r_cursor_fk_corr; FETCH NEXT FROM r_cursor_fk_corr INTO @referencing_object , @referenced_object , @constraint_name , @rules , @key_cols; WHILE @@FETCH_STATUS = 0 BEGIN SET @stm = CONCAT('DELETE FROM trg FROM ', @referencing_object ,' AS trg WHERE ', @key_cols, ' AND NOT EXISTS (SELECT 1 FROM ' , @referenced_object, ' AS src WITH (NOLOCK) WHERE ', @rules, ');'); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; SET @RowCount += @@ROWCOUNT; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_fk_corr INTO @referencing_object , @referenced_object , @constraint_name , @rules , @key_cols; END CLOSE r_cursor_fk_corr; DEALLOCATE r_cursor_fk_corr; END PRINT CONCAT('WHILE DELETE COUNT: ', @WhileDelCount); DECLARE r_cursor_stat CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT CONCAT('UPDATE STATISTICS ', @DB, '.[', t.sch_name, '].[' , t.[name], '] WITH FULLSCAN;') AS stm FROM #tbls AS t; OPEN r_cursor_stat; FETCH NEXT FROM r_cursor_stat INTO @stm; WHILE @@FETCH_STATUS = 0 BEGIN PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_stat INTO @stm END CLOSE r_cursor_stat; DEALLOCATE r_cursor_stat; DECLARE r_cursor_trigg_on CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT FORMATMESSAGE('ALTER TABLE [%s].[%s] ENABLE TRIGGER [%s];' + CHAR(13), SCHEMA_NAME(b.[schema_id]), OBJECT_NAME(t.parent_id) , t.[Name]) AS stm FROM sys.triggers t LEFT JOIN sys.tables b ON b.[object_id] = t.parent_id WHERE t.is_disabled = 1 AND t.[type_desc] = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NOT NULL OPEN r_cursor_trigg_on; FETCH NEXT FROM r_cursor_trigg_on INTO @stm; WHILE @@FETCH_STATUS = 0 BEGIN PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_trigg_on INTO @stm; END CLOSE r_cursor_trigg_on; DEALLOCATE r_cursor_trigg_on; SET @stm = ''; SELECT @stm += FORMATMESSAGE('ENABLE TRIGGER [%s] ON DATABASE;' + CHAR(13), t.[Name]) FROM sys.triggers t WHERE t.is_disabled = 0 AND t.[type_desc] = 'SQL_TRIGGER' AND OBJECT_NAME(t.parent_id) IS NULL; PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH DECLARE r_cursor_fk_recover CURSOR LOCAL FAST_FORWARD READ_ONLY FOR SELECT t.referencing_object, t.referenced_object, t.constraint_name, STRING_AGG (t.referencing_column_name, ',') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS referencing_columns, STRING_AGG (t.referenced_column_name, ',') WITHIN GROUP (ORDER BY t.constraint_column_id ASC) AS referenced_columns, t.delete_referential_action, t.update_referential_action FROM dbo.FKs AS t WHERE NOT EXISTS (SELECT 1 FROM sys.foreign_key_columns AS FK WHERE t.constraint_name = CONCAT('[' , OBJECT_NAME(FK.constraint_object_id), ']')) GROUP BY t.referencing_object, t.referenced_object, t.constraint_name, t.delete_referential_action, t.update_referential_action; OPEN r_cursor_fk_recover; FETCH NEXT FROM r_cursor_fk_recover INTO @referencing_object , @referenced_object , @constraint_name , @referencing_columns , @referenced_columns , @delete_referential_action , @update_referential_action; WHILE @@FETCH_STATUS = 0 BEGIN SET @stm = CONCAT('ALTER TABLE ', @referencing_object ,' WITH CHECK ADD CONSTRAINT ', @constraint_name, ' FOREIGN KEY(', @referencing_columns, ') REFERENCES ' , @referenced_object, ' (', @referenced_columns, ') ' , CASE WHEN @delete_referential_action = 1 THEN 'ON DELETE CASCADE ' WHEN @delete_referential_action = 2 THEN 'ON DELETE SET NULL ' ELSE '' END , CASE WHEN @update_referential_action = 1 THEN 'ON UPDATE CASCADE ' WHEN @update_referential_action = 2 THEN 'ON UPDATE SET NULL ' ELSE '' END , '; ' , 'ALTER TABLE ', @referencing_object, ' CHECK CONSTRAINT ' , @constraint_name, '; '); PRINT @stm; BEGIN TRY BEGIN TRANSACTION; EXEC sys.sp_executesql @stmt = @stm; COMMIT; END TRY BEGIN CATCH SET @ERROR = ERROR_MESSAGE(); PRINT @ERROR; IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION END CATCH FETCH NEXT FROM r_cursor_fk_recover INTO @referencing_object , @referenced_object , @constraint_name , @referencing_columns , @referenced_columns , @delete_referential_action , @update_referential_action; END CLOSE r_cursor_fk_recover; DEALLOCATE r_cursor_fk_recover; EXEC sys.sp_msforeachtable @commAND1="PRINT '?'" , @commAND2="ALTER TABLE ? WITH CHECK CHECK CONSTRAINT ALL"; SELECT t.SchName, t.TblName, t.Cnt, DATEDIFF(millisecond, t.StartMoment, t.FinishMoment) AS DiffMSec, t.ErrorMsg FROM #tbl_res AS t ORDER BY t.SchName ASC, t.TblName ASC;
Для проверки скрипта мы прогнали его на тестовой БД:

Скрипт выполнил перенос данных без ошибок.
Важно и то, что при объеме данных в БД‑источнике около 100 ГБ (размер таблиц в строках до 100 млн записей), время выполнения всего скрипта составило 18 — 19 минут, то есть достигается высокая скорость перезаливки таблиц.
Компоненты скрипта
Теперь немного о подробностях реализации. В приведённом скрипте используется целый стек системных объектов:
sys.sp_msforeachtable — недокументированная хранимая процедура в SQL Server, которая позволяет итеративно применять команду T‑SQL к каждой таблице в текущей базе данных;
sys.triggers — системный объект, который содержит информацию о триггерах в БД;
sys.tables — системный объект, который содержит информацию о таблицах в БД;
sys.sp_executesql — системная хранимая процедура для выполнения инструкции Transact‑SQL или пакета, в том числе, созданных динамически;
sys.foreign_key_columns — системный объект, который содержит информацию о составе внешних ключей;
sys.foreign_keys — системный объект, который содержит информацию о внешних ключах;
sys.columns — системный объект, содержащий информацию о колонках.
Вместо выводов
Реализация перезаливки таблиц в базе данных — довольно скрупулезная задача, в которой без автоматизации легко столкнуться с «подводными камнями» и нерациональным расходованием ресурсов. Поэтому автоматизация — must have для любой системы, где надо перегонять данные между средами. Предложенный скрипт — один из способов такой автоматизации.
Безусловно, описанное решение — не «серебряная пуля». Но наш опыт и проведенные тесты показали, что оно вполне эффективно и надежно справляется с переносом данных между средами.
