Импорт больших dbf файлов в Ms SQL server 2008 с помощью SqlBulkCopy

  • Tutorial
В этой статье я расскажу как загрузить много огромных dbf файлов, состоящих из миллионов записей в вашу базу данных на ms sql сервере за приемлемое время.

Задача на первый взгляд тривиальна. Можно использовать мастер в sql management studio или функцию OPENROWSET через запрос.

Но первый вариант после нескольких попыток отпал из за разных глюков и необходимости загрузки множества файлов в одну таблицу (около 100 файлов). К тому же при продолжительной загрузке вылетала ошибка.

Второй вариант тоже не подошел из за различной разрядности драйверов и разрядности сервера.

Так как файл просто огромный, то было решено читать его через поток и записывать в базу. Далее после прочтения строки в файле надо эту строку записать в табличку. Первое что пришло на ум это использовать insert, но запись в этом случае заняла бы слишком много время.

И тут я вспомнил про другой механизм записи через SqlBulkCopy, который позволяет заливать огромное число записей без запросов insert.
На деле это использование класса SqlBulkCopy, для осуществления записи через который надо реализовать один лишь интерфейс IDataReader.

Итак начнем с реализации интерфейса
public class BDFBulkReader : IDataReader


Начнем с функции, которая возвращает значение текущей записи:
 public object GetValue(int i) { return R[FieldIndex[i]]; }

Обращу ваше внимание на то что поля в файле и поля в таблице могут быть в разном порядке. А по индексу хотелось бы получать значение для соответствующего поля таблицы. Поэтому я использовал дополнительно словарь FieldIndex, где сопоставление имен полей номеру в таблице sql. По номеру берется имя поля, по имени из словаря R берется значение из прочитанной строки dbf файла. В итоге для n го индекса в бд GetValue вернет соответствующее значение.
        Dictionary<string, object> R = new Dictionary<string, object>();
        Dictionary<int, string> FieldIndex = new Dictionary<int, string>();


FieldIndex будем передавать уже заполненный для таблицы, а R будет заполнять при вызове самими reader'ом функции Read, которую в дальнейшем тоже реализуем.

Итак, конструктор:

        System.IO.FileStream FS;
        byte[] buffer;
        int _FieldCount;
        int FieldsLength;
        System.Globalization.DateTimeFormatInfo dfi = new System.Globalization.CultureInfo("en-US", false).DateTimeFormat;
        System.Globalization.NumberFormatInfo nfi = new System.Globalization.CultureInfo("en-US", false).NumberFormat;
        string[] FieldName;
        string[] FieldType;
        byte[] FieldSize;
        byte[] FieldDigs;
        int RowsCount;
        int ReadedRow = 0;
        Dictionary<string, object> R = new Dictionary<string, object>();
        Dictionary<int, string> FieldIndex = new Dictionary<int, string>();

        public BDFBulkReader(string FileName, Dictionary<int, string> FieldIndex)
        {
            FS = new System.IO.FileStream(FileName, System.IO.FileMode.Open);
            buffer = new byte[4]; 
            FS.Position = 4; FS.Read(buffer, 0, buffer.Length);
            RowsCount = buffer[0] + (buffer[1] * 0x100) + (buffer[2] * 0x10000) + (buffer[3] * 0x1000000);
            buffer = new byte[2]; 
            FS.Position = 8; FS.Read(buffer, 0, buffer.Length);
            _FieldCount = (((buffer[0] + (buffer[1] * 0x100)) - 1) / 32) - 1;
            FieldName = new string[_FieldCount]; 
            FieldType = new string[_FieldCount]; 
            FieldSize = new byte[_FieldCount]; 
            FieldDigs = new byte[_FieldCount];
            buffer = new byte[32 * _FieldCount];
            FS.Position = 32; FS.Read(buffer, 0, buffer.Length);
            FieldsLength = 0;
            for (int i = 0; i < _FieldCount; i++)
            {              
                FieldName[i] = System.Text.Encoding.Default.GetString(buffer, i * 32, 10).TrimEnd(new char[] { (char)0x00 });
                FieldType[i] = "" + (char)buffer[i * 32 + 11];
                FieldSize[i] = buffer[i * 32 + 16];
                FieldDigs[i] = buffer[i * 32 + 17];
                FieldsLength = FieldsLength + FieldSize[i];
            }
            FS.ReadByte(); 

            this.FieldIndex = FieldIndex;
        }


Его задачи это открыть файл, определить имена полей, их число и их типы. Второй параметр конструктора, как я писал выше, это словарь соответствий, чтобы, например, по первому номеру поля мы гарантированно получили нужное поле из файла.

Теперь перейдем к реализации bool Read(). Она вернет true в случае если строка успешно прочитана. И false в случае если строка не была прочитана и в то же время был достигнут конец данных.

        public bool Read()
        {
            if (ReadedRow >= RowsCount) return false;

            R.Clear();
            buffer = new byte[FieldsLength];
            FS.ReadByte(); 
            FS.Read(buffer, 0, buffer.Length);
            int Index = 0;
            for (int i = 0; i < FieldCount; i++)
            {
                string l = System.Text.Encoding.GetEncoding(866).GetString(buffer, Index, FieldSize[i]).TrimEnd(new char[] { (char)0x00 }).TrimEnd(new char[] { (char)0x20 });
                Index = Index + FieldSize[i];
                object Tr;
                if (l.Trim() != "")
                {
                    switch (FieldType[i])
                    {
                        case "L": Tr = l == "T" ? true : false; break;
                        case "D": Tr = DateTime.ParseExact(l, "yyyyMMdd", dfi); break;
                        case "N":
                            {
                                if (FieldDigs[i] == 0)
                                    Tr = int.Parse(l, nfi);
                                else
                                    Tr = decimal.Parse(l, nfi);
                                break;
                            }
                        case "F": Tr = double.Parse(l, nfi); break;
                        default: Tr = l; break;
                    }

                }
                else
                {
                    Tr = DBNull.Value;
                }
                R.Add(FieldName[i], Tr);
            }
            ReadedRow++;
            return true;
        }


Еще раз напомню, что после ее вызова прочитанная строка запишется в словарь R, для последующего чтения reader'ом.
Итак, осталось реализовать, метод возвращающий число полей:

public int FieldCount { get { return _FieldCount; } }


И заглушки для интерфейса:

public void Dispose() { FS.Close(); }       
        public int Depth { get { return -1; } }
        public bool IsClosed { get { return false; } }
        public Object this[int i] { get { return new object(); } }
        public Object this[string name] { get { return new object(); } }
        public int RecordsAffected { get { return -1; } }

        public void Close() { }
        public bool NextResult() { return true; }
        public bool IsDBNull(int i) { return false; }
        public string GetString(int i) { return ""; }
        public DataTable GetSchemaTable() { return null; }
        public int GetOrdinal(string name) { return -1; }
        public string GetName(int i) { return ""; }
        public long GetInt64(int i) { return -1; }
        public int GetInt32(int i) { return -1; }
        public short GetInt16(int i) { return -1; }
        public Guid GetGuid(int i) { return new Guid(); }
        public float GetFloat(int i) { return -1; }
        public Type GetFieldType(int i) { return typeof(string); }
        public double GetDouble(int i) { return -1; }
        public decimal GetDecimal(int i) { return -1; }
        public DateTime GetDateTime(int i) { return new DateTime(); }
        public string GetDataTypeName(int i) { return ""; }
        public IDataReader GetData(int i) { return this; }
        public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) { return -1; }
        public char GetChar(int i) { return ' '; }
        public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) { return -1; }
        public byte GetByte(int i) { return 0x00; }
        public bool GetBoolean(int i) { return false; }
        public int GetValues(Object[] values) { return -1; }


Где в Dispose() я просто закрываю файл.

После того как интерфейс реализован, можно написать метод для загрузки файла:

        void SaveToTable(FileInfo dir, string TableName, string connestionString, Dictionary<int, string> FieldIndex)
        {
            using (var loader = new SqlBulkCopy(connestionString, SqlBulkCopyOptions.Default))
            {               
                loader.DestinationTableName = TableName;
                loader.BulkCopyTimeout = 9999;
                loader.WriteToServer(new BDFBulkReader(dir.FullName, FieldIndex));               
            }
        }


Вот и все. В эту функцию осталось передать расположение файла, имя таблицы, строку подключения и соответствующий словарь соответствий, например:

Dictionary<int, string> FieldIndex= new Dictionary<int, string>();
            FieldIndex.Add(0, "POSTALCODE");
            FieldIndex.Add(1, "IFNSFL");
            FieldIndex.Add(2, "TERRIFNSFL");
            FieldIndex.Add(3, "IFNSUL");
            FieldIndex.Add(4, "TERRIFNSUL");
            FieldIndex.Add(5, "OKATO");
            FieldIndex.Add(6, "OKTMO");
            FieldIndex.Add(7, "UPDATEDATE");
            FieldIndex.Add(8, "HOUSENUM");
            FieldIndex.Add(9, "ESTSTATUS");
            FieldIndex.Add(10, "BUILDNUM");
            FieldIndex.Add(11, "STRUCNUM");
            FieldIndex.Add(12, "STRSTATUS");
            FieldIndex.Add(13, "HOUSEID");
            FieldIndex.Add(14, "HOUSEGUID");
            FieldIndex.Add(15, "AOGUID");
            FieldIndex.Add(16, "STARTDATE");
            FieldIndex.Add(17, "ENDDATE");
            FieldIndex.Add(18, "STATSTATUS");
            FieldIndex.Add(19, "NORMDOC");
            FieldIndex.Add(20, "COUNTER");


Все, всем спасибо за внимание, приятной загрузки.

Полезные ссылки:
Описание SqlBulkCopy
Внутренности dbf
Поделиться публикацией

Комментарии 7

    0
    почему
    .TrimEnd(new char[] { (char)0x00 }).TrimEnd(new char[] { (char)0x20 });
    а не
    .TrimEnd(new char[] { (char)0x00, (char)0x20 });

    почему FieldIndex — Dictionary, а не обычный массив? (номера же подряд идут)
      +2
      Тогда уж зачем вообще явно создавать массив?
      .TrimEnd((char)0x00, (char)0x20);
        +1
        Согласен, можно и массив.
        +1
        Не нравится мне что-то вот этот код:
        FieldName[i] = System.Text.Encoding.Default.GetString(buffer, i * 32, 10).TrimEnd(new char[] { (char)0x00 });
        


        Наверняка же в файле кодировка используется конкретная, не зависящая от того, на ОС какой локализации код запускается.
          0
          Я вас понял. В тех файлах кодировка была известна, но она возможно может быть и другой. Можно и дописать определение кодировки. Можно много еще плюшек так то дописать…
            +1
            Просто надо привыкнуть в таких случаях вместо System.Text.Encoding.Default всегда писать System.Text.Encoding.GetEncoding(1251). Если понадобится кодировку сменить — за «магическое» значение глазу проще зацепиться. Да и на разных системах будет работать одинаково, что тоже плюс.
          0
          дополнение для импорта текстовых данных из .dbt-файлов (спутников .dbf):

                              switch (fieldType[i])
                              {
                                  ...
                                  case "M":
                                      result = GetMemoField(value, fileName.ToLower().Replace(".dbf", ".dbt"));
                                      break;
                                  ...
                              }
          

                  string GetMemoField(string recordContent, string dbtFile)
                  {
                      if (recordContent.Trim().Length > 0)
                      {
                          int block = Int32.Parse(recordContent);
                          return GetMemoField(block, dbtFile);
                      }
          
                      return ""; // memo has no content: maybe should be null for more correctness
                  }
          
                  string GetMemoField(int blockIndex, string dbtFile)
                  {
                      using (var fileStream = new FileStream(dbtFile, FileMode.Open, FileAccess.Read))
                      {
                          long start = blockIndex * 512;
                          fileStream.Seek(start, SeekOrigin.Begin);
                          var bytes = new List<byte>();
                          while (fileStream.Position < fileStream.Length)
                          {
                              byte c = (byte) fileStream.ReadByte();
                              long pos = fileStream.Position;
                              if (c == 0x1a
                                  && fileStream.ReadByte() == 0x1a
                                  )
                                  return Encoding.UTF8.GetString(Encoding.Convert(Encoding.GetEncoding(866), Encoding.UTF8, bytes.ToArray()));
                              fileStream.Seek(pos, SeekOrigin.Begin);
                              bytes.Add(c);
                          }
                      }
          
                      throw new InvalidOperationException();
                  }
          

          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

          Самое читаемое