Вступление
Это перевод статьи Yuria Magurdumova. Я новичок после окончания бакалавра, так что у меня знаний в программировании не хватило чтобы самому придумать эффективный метод ставки. Статья помогла мне решить задачу с быстрой вставкой с помощью SqlBulkCopy с IDataReader. Перевел ее с помощью гугла, чтобы была и на habre) Все исходники для скачивания в оригинальной статье.
SqlBulkCopy
можно использовать в трех вариантах: вставка данных, представленных в виде DataTable
, массива DataRow
или IDataReader
экземпляра. В этой статье я продемонстрирую две реализации IDataReader
интерфейса, которые используются в сочетании с SqlBulkCopy
высокопроизводительной вставкой в базу данных. Два других варианта похожи друг на друга и могут использоваться для относительно небольших объемов данных, поскольку они требуют, чтобы все записи были предварительно загружены в память перед их передачей SqlBulkCopy
. Напротив, этот IDataReader
подход более гибкий и позволяет работать с неограниченным количеством записей в «ленивом» режиме, что означает, что данные могут передаваться SqlBulkCopy
на лету так быстро, как сервер может их использовать. Это аналогично подходу IList<T>
vs.IEnumerable<T>
.
Использование Demo
Прилагаемый демонстрационный проект состоит из предварительно скомпилированного консольного приложения с файлом конфигурации и Data
подпапкой, содержащей CSV
файл образца . Перед запуском демонстрации обязательно настройте файл конфигурации, указав правильное соединение с string
именем " DefaultDb
". Другой параметр " MaxRecordCount
" равен 100,000
по умолчанию, что должно быть подходящим для этой демонстрации. Обратите внимание, что соединение string
может указывать на любую существующую базу данных. Все демонстрационные таблицы будут созданы автоматически, поэтому нет необходимости настраивать базу данных вручную.
После запуска демонстрации она появится в окне консоли с просьбой нажать Enter
перед инициализацией базы данных и перед выполнением каждого демонстрационного действия. Завершенная демонстрация на моей машине выглядит следующим образом:
В качестве первого шага приложение попытается инициализировать базу данных. Он создаст (или воссоздаст) три таблицы - по одной для каждого демонстрационного действия:
Contacts
таблица с колонкамиId
,FirstName
,LastName
иBirthDate
.DynamicData
таблица с колонкамиId
, 10integer
, 10string
, 10datetime
, and 10guid
.CsvData
таблица, имеющая ту же структуру, что иDynamicData.
Затем приложение выполнит три демонстрационных действия, измеряющих время для каждого действия:
Static Dataset Demo
демонстрирует,ObjectDataReader<T>
который позволяет обрабатывать экземпляры любогоPOCO
класса (Contact
в данном случае класса).Dynamic Dataset Demo
демонстрирует,DynamicDataReader<T>
который также реализуетIDataReader
, но позволяет пользователю решать, как извлекать данные из базового объекта сT
помощью определяемого пользователем лямбда-выражения. В этой демонстрации я используюIDictionary<string, object>
для представления данных.CSV Import Demo
используетCsvParser
класс и вышеупомянутыйDynamicDataReader<T>
для эффективной загрузки прикрепленного файла " Data \ CsvData.csv " в базу данных.
Данные для первых двух демонстраций генерируются случайным образом на лету с использованием вспомогательного класса RandomDataGenerator
. Другой вспомогательный класс TableSchemaProvider
используется для извлечения некоторых метаданных из SQL Server и выполнения некоторых служебных команд SQL.
ObjectDataReader
Как показано ниже, ObjectDataReader<T>
принимает IEnumerable<T>
в своем конструкторе, который представляет поток фактических данных, которые будут использоваться SqlBulkCopy
классом. Важно отметить , что GetOrdinal()
и GetValue()
методы не используют Reflection каждый раз , когда они нуждаются в свойствах доступа T
. Вместо этого они используют предварительно скомпилированные и кэшированные лямбда-выражения, которые играют роль средств доступа к свойствам и поиска. Эти предварительно скомпилированные лямбда-выражения во много раз быстрее, чем при использовании отражения.
public sealed class ObjectDataReader<TData> : IDataReader
{
private class PropertyAccessor
{
public List<Func<TData, object>> Accessors { get; set; }
public Dictionary<string, int> Lookup { get; set; }
}
private static readonly Lazy<PropertyAccessor> s_propertyAccessorCache =
new Lazy<PropertyAccessor>(() =>
{
var propertyAccessors = typeof(TData)
.GetProperties(BindingFlags.Instance | BindingFlags.Public)
.Where(p => p.CanRead)
.Select((p, i) => new
{
Index = i,
Property = p,
Accessor = CreatePropertyAccessor(p)
})
.ToArray();
return new PropertyAccessor
{
Accessors = propertyAccessors.Select(p => p.Accessor).ToList(),
Lookup = propertyAccessors.ToDictionary(
p => p.Property.Name, p => p.Index, StringComparer.OrdinalIgnoreCase)
};
});
private static Func<TData, object> CreatePropertyAccessor(PropertyInfo p)
{
var parameter = Expression.Parameter(typeof(TData), "input");
var propertyAccess = Expression.Property(parameter, p.GetGetMethod());
var castAsObject = Expression.TypeAs(propertyAccess, typeof(object));
var lamda = Expression.Lambda<Func<TData, object>>(castAsObject, parameter);
return lamda.Compile();
}
private IEnumerator<TData> m_dataEnumerator;
public ObjectDataReader(IEnumerable<TData> data)
{
m_dataEnumerator = data.GetEnumerator();
}
#region IDataReader Members
public void Close()
{
Dispose();
}
public int Depth => 1;
public DataTable GetSchemaTable()
{
return null;
}
public bool IsClosed => m_dataEnumerator == null;
public bool NextResult()
{
return false;
}
public bool Read()
{
if (IsClosed)
throw new ObjectDisposedException(GetType().Name);
return m_dataEnumerator.MoveNext();
}
public int RecordsAffected => -1;
#endregion
// IDisposable Members
#region IDataRecord Members
public int GetOrdinal(string name)
{
int ordinal;
if (!s_propertyAccessorCache.Value.Lookup.TryGetValue(name, out ordinal))
throw new InvalidOperationException("Unknown parameter name: " + name);
return ordinal;
}
public object GetValue(int i)
{
if (m_dataEnumerator == null)
throw new ObjectDisposedException(GetType().Name);
return s_propertyAccessorCache.Value.Accessors[i](m_dataEnumerator.Current);
}
public int FieldCount => s_propertyAccessorCache.Value.Accessors.Count;
// Not Implemented IDataRecord Members ...
#endregion
}
После того, как мы ObjectDataReader<T>
реализовали, мы можем подключить его SqlBulkCopy
следующим образом:
private static async Task RunStaticDatasetDemoAsync(SqlConnection connection, int count,
CancellationToken cancellationToken)
{
using (var bulkCopy = new SqlBulkCopy(connection))
{
bulkCopy.DestinationTableName = "Contacts";
bulkCopy.BatchSize = 1000;
bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;
bulkCopy.ColumnMappings.Add("Id", "Id");
bulkCopy.ColumnMappings.Add("FirstName", "FirstName");
bulkCopy.ColumnMappings.Add("LastName", "LastName");
bulkCopy.ColumnMappings.Add("BirthDate", "BirthDate");
using (var reader = new ObjectDataReader<Contact>(new RandomDataGenerator().GetContacts(count)))
await bulkCopy.WriteToServerAsync(reader, cancellationToken);
}
}
DynamicDataReader
Вы можете использовать, DynamicDataReader<T>
если нет статически определенного класса, представляющего данные. Лучший пример, который иллюстрирует цель, DynamicDataReader<T>
- это когда каждая запись вашей таблицы представлена в виде, Dictionary<string, object>
где ключи являются именами столбцов. Таким образом, если в словаре нет значения для данного столбца, Null
будет принято значение. И наоборот, все элементы в словаре, которые не связаны ни с одним столбцом в таблице, будут проигнорированы.
public sealed class DynamicDataReader<T> : IDataReader
{
private readonly IList<SchemaFieldDef> m_schema;
private readonly IDictionary<string, int> m_schemaMapping;
private readonly Func<T, string, object> m_selector;
private IEnumerator<T> m_dataEnumerator;
public DynamicDataReader(IList<SchemaFieldDef> schema, IEnumerable<T> data,
Func<T, string, object> selector)
{
m_schema = schema;
m_schemaMapping = m_schema
.Select((x, i) => new { x.FieldName, Index = i })
.ToDictionary(x => x.FieldName, x => x.Index);
m_selector = selector;
m_dataEnumerator = data.GetEnumerator();
}
#region IDataReader Members
public void Close()
{
Dispose();
}
public int Depth => 1;
public DataTable GetSchemaTable()
{
return null;
}
public bool IsClosed => m_dataEnumerator == null;
public bool NextResult()
{
return false;
}
public bool Read()
{
if (IsClosed)
throw new ObjectDisposedException(GetType().Name);
return m_dataEnumerator.MoveNext();
}
public int RecordsAffected => -1;
#endregion
// IDisposable Members
#region IDataRecord Members
public int FieldCount => m_schema.Count;
public int GetOrdinal(string name)
{
int ordinal;
if (!m_schemaMapping.TryGetValue(name, out ordinal))
throw new InvalidOperationException("Unknown parameter name: " + name);
return ordinal;
}
public object GetValue(int i)
{
if (m_dataEnumerator == null)
throw new ObjectDisposedException(GetType().Name);
var value = m_selector(m_dataEnumerator.Current, m_schema[i].FieldName);
if (value == null)
return DBNull.Value;
var strValue = value as string;
if (strValue != null)
{
if (strValue.Length > m_schema[i].Size && m_schema[i].Size > 0)
strValue = strValue.Substring(0, m_schema[i].Size);
if (m_schema[i].DataType == DbType.String)
return strValue;
return SchemaFieldDef.StringToTypedValue(strValue, m_schema[i].DataType) ?? DBNull.Value;
}
return value;
}
// Not Implemented IDataRecord Members
#endregion
}
DynamicDataReader<T>
передает SchemaFieldDef
класс, который описывает имя поля, размер и тип данных БД столбца таблицы. Только те столбцы, которые были переданы через constructor ( IList<SchemaFieldDef> schema
), будут участвовать во вставке данных. Два других параметра конструктора представляют сами данные ( IEnumerable<T> data
) и определенное пользователем лямбда-выражение ( Func<T, string, object> selector
) для доступа к свойствам. Как видите, selector
принимает экземпляр T
и string
имя поля и возвращает object
обратно, представляющее значение, связанное с этим именем поля. Обратите внимание , что object
тип данных «S может быть либо не- string
типа С # ( int
, decimal
, DateTime
, Guid
и т.д.) , что соответствует фактическому типу в базе данных ( int
, numeric
, datetime
, uniqueidentifier
и т.д.), или простоstring
. В последнем случае DynamicDataReader
будет пытаться автоматически преобразовать string
значение в соответствующий тип данных с помощью SchemaFieldDef.StringToTypedValue()
метода. Этот метод поддерживает только несколько типов данных, но при необходимости его можно легко расширить.
Вот пример использования DynamicDataReader<T>
вместе с SqlBulkCopy
:
private static async Task RunDynamicDatasetDemoAsync(SqlConnection connection, int count,
CancellationToken cancellationToken)
{
var fields = await new TableSchemaProvider(connection, "DynamicData").GetFieldsAsync();
using (var bulkCopy = new SqlBulkCopy(connection))
{
bulkCopy.DestinationTableName = "DynamicData";
bulkCopy.BatchSize = 1000;
bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;
foreach (var field in fields)
bulkCopy.ColumnMappings.Add(field.FieldName, field.FieldName);
var data = new RandomDataGenerator().GetDynamicData(count);
using (var reader = new DynamicDataReader<IDictionary<string, object>>
(fields, data, (x, k) => x.GetValueOrDefault(k)))
await bulkCopy.WriteToServerAsync(reader, cancellationToken);
}
}
Это очень похоже на ObjectDataReader
использование, за исключением того, что поля не связаны статически.
Импорт файла CSV
Наконец, третье демонстрационное действие включает CsvParser
, DynamicDataReader
и SqlBulkCopy
классы, работающие вместе для достижения высокопроизводительного и масштабируемого импорта данных в CSV
формате:
private static async Task RunCsvDatasetDemoAsync(SqlConnection connection, int count,
CancellationToken cancellationToken)
{
using (var csvReader = new StreamReader(@"Data\CsvData.csv"))
{
var csvData = CsvParser.ParseHeadAndTail(csvReader, ',', '"');
var csvHeader = csvData.Item1
.Select((x, i) => new {Index = i, Field = x})
.ToDictionary(x => x.Field, x => x.Index);
var csvLines = csvData.Item2;
var fields = await new TableSchemaProvider(connection, "CsvData").GetFieldsAsync();
using (var bulkCopy = new SqlBulkCopy(connection))
{
bulkCopy.DestinationTableName = "CsvData";
bulkCopy.BatchSize = 1000;
bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;
foreach (var field in fields)
bulkCopy.ColumnMappings.Add(field.FieldName, field.FieldName);
using (var reader = new DynamicDataReader<IList<string>>(fields, csvLines.Take(count),
(x, k) => x.GetValueOrDefault(csvHeader.GetValueOrDefault(k, -1))))
{
await bulkCopy.WriteToServerAsync(reader, cancellationToken);
}
}
}
}
В демонстрационных целях только1,000
строк есть в файле CsvData.csv.Однако это конкретное решение сможет обрабатывать любое количество строк с относительно стабильной производительностью. Оно будет сопоставлять имена столбцов в CSV
файле с именами столбцов целевой таблицы. Отсутствующие данные будут заполнены буквой Null
. Любые дополнительные столбцы, отсутствующие в целевой таблице, будут проигнорированы.
Резюме
В этой статье я продемонстрировал один из возможных способов обработки высокопроизводительных вставок в базу данных с использованием управляемого кода. Моей целью было создать гибкий и простой в использовании API, чтобы его можно было применять во многих различных сценариях. В частности, используйте ObjectDataReader<T>
для загрузки данных, представленных в виде статически определенных POCO
классов, и DynamicDataReader<T>
для загрузки данных любой структуры.