Привет, Хабр! Меня зовут Павел Кузьмин, я работаю ведущим разработчиком в РСХБ-Интех. Однажды в своей работе мы столкнулись с острой необходимостью перенести БД объемом 1,4 ТБ (более 1,5 млрд строк) с MS SQL на PostgresSQL не более чем за 20 часов. Неожиданно для нас, все имеющиеся готовые варианты не подходили, поэтому мы решили взять библиотеку Npgsql на C# и написать свой код. В итоге созданное решение справилось с поставленной задачей за 13 часов. Рассказываем, как мы это сделали, и делимся кодом. Возможно, он вам пригодится в работе.
Что перепробовали
Перед тем, как заморачиваться написанием своего решения, мы попробовали несколько уже имеющихся вариантов. Но, к сожалению, ни один из них не подошел.
Вариант 1 (DBLink)
В первую очередь мы попробовали встроенные в MS SQL решения. Через DBLink, драйвер ODBC, можно подключить БД на PostgresSQL. По сути мы из MS SQL можем перегонять таблицы в PostgresSQL через insert into
.
Мы переносили стандартным методом. Процесс переноса занял бы около месяца, поэтому мы отказались от этого варианта.
INSERT INTO pg.table (column1, column2, column3)
select column1, column2, column3
from mssql.table
Вариант 2 (DBLink с использованием BULK INSERT)
С помощью DBLink, используя BULK insert into и вычитывая из заранее подготовленных файлов CSV. Это как раз механизм, придуманный для быстрого переноса данных. Мы пробовали два разных варианта реализации.
а) Экспорт данных через bcp приводил к созданию некорректно сформированного CSV. Были проблемы с датами и плавающей точкой. Кроме того, из-за изъянов в файлах было невозможно гарантировать корректность передачи.
б) Экспорт через DataGrip (софт от JetBrains) занимал 23 часа, а нужно было еще импорт делать.
Из-за того что объем файла получался очень большой, перенос csv на сервер занимал значительное время.
BULK INSERT pg.table
FROM 'path.csv'
WITH (
FIELDTERMINATOR = ',', -- Разделитель полей (здесь - запятая)
ROWTERMINATOR = '\n', -- Разделитель строк (перевод строки)
BATCHSIZE = 10000, -- Размер пакета (опционально, для оптимизации)
TABLOCK -- Эксклюзивная блокировка таблицы во время операции вставки
);
Вариант 3 (DataGrip)
Пробовали использовать отдельно DataGrip, подключившись к двум базам данных. Но переливка была бы по конкретной таблице, то есть сопровождение должно уметь пользоваться данной функцией и переносить каждую таблицу настраивая ручками. К тому же, с течением времени при больших объемах скорость вставки сильно падала меньше (5к в сек.), что приводило к длительной выгрузке. Плюс DataGrip является платным.
На конкретной таблице (правой кнопкой мыши) Import/Export -> Copy table to …
Вариант 4 (pg_dump
и pg_restore
)
Использование утилит pg_dump
и pg_restore
. Экспорт данных — долгий процесс, импорт был быстрым. Чисел нет, просто поняли, что в выходные не вписываемся. Требовались какие-то дополнительные телодвижения, но там только PostgreSql бекап восстанавливает, а у нас надо было с MSSQL.
К чему пришли
В итоге мы остановились на том, что нужно писать свое решение. Мы использовали библиотеку Npgsql на C#. Но и наш код тоже не сразу получился быстрым и хорошим.
В рамках экспериментов с самописным кодом мы пробовали пакетированную передачу данных. То есть устанавливали размер пакета самостоятельно. Но получалось, что мы очень сильно буксуем на больших объемах. Сначала все быстро идет, но потом мы начинаем искать точку, с которой мы остановились. С каждым разом поиски занимали все больше времени, поскольку нужно было пропустить все предыдущие записи. Процесс затягивался.
Пробовали еще вариант, когда прям сразу читаем и вставляем по одной конкретной записи. Вставка занимала много времени в совокупности. Поэтому начали идти в сторону массовой вставки, так как чтение нас устраивало.
Через ряд болей и страданий и изучение разной документации, мы пришли к тому, что у Npgsql есть такая возможность — Bulk copy program, BCP.
Мы подумали, что можно передавать данные через Npgsql, и начали знакомиться с NpgsqlBinaryImporter. Сначала прописывали вручную каждый столбец, который есть в базе данных. Но быстро поняли, что нам придется очень много писать, писать и еще раз писать. Зато этот вариант уже был рабочий и быстрее других. Оставалось дело за оптимизацией.
Мы настроили автоматическое сопоставление столбцов по порядковым номерам. Но это тоже накладывает определенные сложности, при которых столбцы имели разные порядковые номера в исходной и целевой таблице. Тут нам на помощь пришел select
с нужным написанием порядковых номеров. Мы брали данные из MSSQL в той последовательности, в которой они есть у нас в PostgreSQL, тем самым обходя такие кейсы.
По итогу в режиме пакетной бинарной передачи данных перенос нашей БД занял около 13 часов. И в целом бинарный формат при переносе данных оказался более эффективным по сравнению с текстовым, так как он быстрее и занимает меньше времени, а NpgsqlBinaryImporter стал главным помощником.
По итогу, два главных плюса:
Скорость: Двоичный формат передачи данных значительно быстрее, чем текстовый формат, особенно для больших объемов данных.
Эффективность: Двоичный формат более компактный, что сокращает объём трафика и время передачи.
Из минусов: нужно убедиться, чтобы в исходной и целевой таблицах столбцы были одного порядка. Иначе придется через select
выстроить правильную последовательность. В коде пример отражен.
Делимся кодом
using System.Data;
using System.Data.SqlClient;
using Npgsql;
//Введи ConnectionString для MSSQL, откуда будут копироваться данные: (Пример: Data Source=***;Initial Catalog=***;User ID=***;Password=***;)
string sourceConnectionString = "Data Source=***;Initial Catalog=***;User ID=***;Password=***;";
//Введи ConnectionString для PostgreSql, куда будут копироваться данные: (Пример: Host=***;Port=***;Username=***;Password=***;Database=***;SearchPath=***;)
string targetConnectionString = "Host=***;Port=***;Username=***;Password=***;Database=***;SearchPath=***;";
//Введи схему БД для выполнения пред/после скрипта: (Пример: accreditation)
string shemaString = "accreditation";
AppContext.SetSwitch("Npgsql.EnableLegacyTimestampBehavior", true);
//Скрипт подготовки
ExecuteScriptBefore();
string[] tableNames = new string[]
{
"table1",
"table2",
"table3"
};
IEnumerable<Task> tasks = tableNames.Select(TransferDataAsync);
await Task.WhenAll(tasks);
//Скрипт восстановления после подготовки
ExecuteScriptAfter();
Console.ReadLine();
async Task TransferDataAsync(string tableName)
{
try
{
await using (SqlConnection msSqlConnection = new SqlConnection(sourceConnectionString))
await using (NpgsqlConnection pgSqlConnection = new NpgsqlConnection(targetConnectionString))
{
await msSqlConnection.OpenAsync();
await pgSqlConnection.OpenAsync();
Console.WriteLine($"Начало переноса данных в таблицу: {tableName}.");
string selectQuery = null!;
if (tableName == "table3") //Тот самый случай когда столбцы в разных порядковых номерах в MSSQL и PostgreSql
{
selectQuery = $"SELECT column3, column1, column2 FROM {tableName}";
}
else
{
selectQuery = $"SELECT * FROM {tableName}";
}
await using SqlCommand sqlCommand = new SqlCommand(selectQuery, msSqlConnection);
{
sqlCommand.CommandTimeout = 900;
await using SqlDataReader? sqlDataReader = await sqlCommand.ExecuteReaderAsync(CommandBehavior.CloseConnection);
{
string tableNameLower = AddUnderscoreToCaps(tableName).ToLower();
await using (NpgsqlBinaryImporter writer =
pgSqlConnection.BeginBinaryImport($"COPY {tableNameLower} FROM STDIN (FORMAT BINARY)"))
{
writer.Timeout = TimeSpan.FromSeconds(900);
while (await sqlDataReader.ReadAsync())
{
object[] values = new object[sqlDataReader.FieldCount];
sqlDataReader.GetValues(values);
await writer.WriteRowAsync(CancellationToken.None, values);
}
Console.WriteLine($"Начинается вставка: {DateTime.Now} для таблицы {tableNameLower}.");
await writer.CompleteAsync();
Console.WriteLine($"Вставка закончена: {DateTime.Now} для таблицы {tableNameLower}.");
}
}
}
}
}
catch (PostgresException px)
{
Console.WriteLine($"Таблица {tableName}.");
Console.WriteLine("Exception data:");
Console.WriteLine($"Severity: {px.Data["Severity"]}");
Console.WriteLine($"SqlState: {px.Data["SqlState"]}");
Console.WriteLine($"MessageText: {px.Message}");
Console.WriteLine($"Where: {px.Data["Where"]}, line {px.Data["Line"]}, column {px.Data["Column"]}");
Console.WriteLine($"File: {px.Data["File"]}");
Console.WriteLine($"Line: {px.Data["Line"]}");
Console.WriteLine($"px: {px}");
throw;
}
catch (SqlException se)
{
Console.WriteLine($"Таблица {tableName}. Message: {se.Message}. se:{se}");
ExecuteScriptAfter();
throw;
}
catch (Exception ex)
{
Console.WriteLine($"Таблица {tableName}. Message: {ex.Message}. ex:{ex}");
throw;
}
}
void ExecuteScriptBefore()
{
// SQL-скрипт для выполнения
string script = $@"
-- Ваш SQL-скрипт здесь
--Удаляем FK ключи если есть
ALTER TABLE {shemaString}.table1 DROP CONSTRAINT fk_table1_id;
ALTER TABLE {shemaString}.table2 DROP CONSTRAINT fk_table2_id;
ALTER TABLE {shemaString}.table3 DROP CONSTRAINT fk_table3_id;
--Чистим таблицы в которые будем заливать данные
truncate {shemaString}.table1 cascade;
truncate {shemaString}.table2 cascade;
truncate {shemaString}.table3 cascade;
";
// Вызов метода выполнения скрипта
ExecuteScript(targetConnectionString, script, "Скрипт подготовки к миграции успешно выполнен.");
}
void ExecuteScriptAfter()
{
// SQL-скрипт для выполнения
string script = $@"
-- Ваш SQL-скрипт здесь
--Добавляем FK ключи если есть
ALTER TABLE {shemaString}.table1 ADD CONSTRAINT fk_table1_id FOREIGN KEY (id) REFERENCES {shemaString}.table1 (id);
ALTER TABLE {shemaString}.table2 ADD CONSTRAINT fk_table2_id FOREIGN KEY (id) REFERENCES {shemaString}.table2 (id);
ALTER TABLE {shemaString}.table3 ADD CONSTRAINT fk_table3_id FOREIGN KEY (id) REFERENCES {shemaString}.table3 (id);
";
// Вызов метода выполнения скрипта
ExecuteScript(targetConnectionString, script, "Сброшенные настройки БД восстановлены.");
}
void ExecuteScript(string connectionString, string script, string message)
{
using (NpgsqlConnection connection = new NpgsqlConnection(connectionString))
{
try
{
connection.Open();
using (NpgsqlCommand command = new NpgsqlCommand(script, connection))
{
command.CommandTimeout = 900;
// Выполнение скрипта
command.ExecuteNonQuery();
Console.WriteLine(message);
}
}
catch (Exception ex)
{
Console.WriteLine($"Ошибка выполнения скрипта: {ex.Message}. ex{ex}.");
}
}
}
string AddUnderscoreToCaps(string input)
{
return string.Join("", input.Select((c, i) => i > 0 && char.IsUpper(c) ? "_" + c.ToString() : c.ToString()));
}
На этом все. Спасибо за внимание!
Готов ответить на ваши вопросы.