HBase, загрузка больших массивов данных через bulk load

    Привет коллеги.
    Хочу поделиться своим опытом использования HBase, а именно рассказать про bulk loading. Это еще один метод загрузки данных. Он принципиально отличается от обычного подхода (записи в таблицу через клиента). Есть мнение, что с помощью bulk load можно очень быстро загружать огромные массивы данных. Именно в этом я решил разобраться.

    И так, обо всём по порядку. Загрузка через bulk load происходит в три этапа:

    • Помещаем файлы с данными в HDFS
    • Запускаем MapReduce задачу, которая преобразует исходные данные непосредственно в файлы формата HFile, посути HBase хранит свои данные именно в таких файлах.
    • Запускаем bulk load функцию, которая зальёт (привяжет) полученные файлы в таблицу HBase.




    В данном случае мне было необходимо прочувствовать эту технологию и понять её в цифрах: чему равна скорость, как она зависит от количества и размера файлов. Эти числа слишком зависимы от внешних условий, но помогают понять порядки между обычной загрузкой и bulk load.

    Исходные данные:


    Кластер под управлением Cloudera CDH4, HBase 0.94.6-cdh4.3.0.
    Три виртуальных хоста (на гипервизоре), в конфигурации CentOS / 4CPU / RAM 8GB / HDD 50GB
    Тестовые данные хранились в CSV файлах различных размеров, суммарным объёмом 2GB, 3.5GB, 7.1GB и 14.2GB
    Сначала о результатах:

    Bulk loading


    Cкорость:
    • Max 29.2 Mb/sec или 58K rec/sec (3.5GB в 28 файлах)
    • Average 27 Mb/sec или 54K rec/sec(рабочая скорость, более приближенная к реальности )
    • Min 14.5 Mb/sec или 29K rec/sec (2GB в 100 файлах)
    • 1 файл загружается на 20% быстрее чем 100


    Размер одной записи (row): 0.5Кb
    Время инициализации MapReduce Job: 70 sec
    Время загрузки файлов в HDFS с локальной файловой системы:
    • 3.5GB / 1 файл — 65 sec
    • 7.5GB / 100 — 150 sec
    • 14.2G / 1 файл — 285 sec


    Загрузка через клиенты:


    Загрузка осуществлялась с 2-х хостов по 8 потоков на каждом.
    Клиенты запускались по крону в одно и тоже время, загрузка CPU не превышала 40%
    Размер одной записи (row), как и в предыдущем случае был равен 0.5Кb.



    Что в итоге?




    Реализовать этот тест, я решил на волне разговоров о bulk load как о способе сверхбыстрой загрузки данных. Стоит сказать, что в официальной документации речь идет только о снижении нагрузки на сеть и CPU. Как бы там ни было, я не вижу выигрыша в скорости. Тесты показывают что bulk load быстре всего лишь в полтора раза, но не будем забывать что это без учета инициализации m/r джобы. Кроме того, данные надо доставить в HDFS, на это тоже потребуется какое то время.
    Думаю, стоит относиться к bulk load просто, как к еще одному способу загрузки данных, архитектурно иному (в некоторых случаях очень даже удобному).

    А теперь о реализации


    Теоретически всё довольно просто, но на практике возникает несколько технических нюансов.

    //Создаём джоб
    Job job = new Job(configuration, JOB_NAME);
    job.setJarByClass(BulkLoadJob.class);
    
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(Put.class);
    
    job.setMapperClass(DataMapper.class);
    job.setNumReduceTasks(0);
    
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(HFileOutputFormat.class);
    
    FileInputFormat.setInputPaths(job, inputPath);
    HFileOutputFormat.setOutputPath(job, new Path(outputPath));
    
    HTable dataTable = new HTable(jobConfiguration, TABLE_NAME);
    HFileOutputFormat.configureIncrementalLoad(job, dataTable);
    
    //Запускаем
    ControlledJob controlledJob = new ControlledJob(
        job,
        null
    );
    
    JobControl jobController = new JobControl(JOB_NAME);
    jobController.addJob(controlledJob);
    
    Thread thread = new Thread(jobController);
    thread.start();
    .
    .
    .
    //Даём права на output
    setFullPermissions(JOB_OUTPUT_PATH);
    
    //Запускаем функцию bulk-load
    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(jobConfiguration);
    loader.doBulkLoad(
            new Path(JOB_OUTPUT_PATH),
            dataTable
    );
    


    • MapReduce Job создаёт выходные файлы c правами пользователя, от имени которого он был запущен.
    • bulk load всегда запускается от имени пользователя hbase, поэтому не может прочитать подготовленные для него файлы, и валится вот с таким исключением: org.apache.hadoop.security.AccessControlException: Permission denied: user=hbase


    Поэтому надо запускать Job от имени пользователя hbase или раздать права на выходные файлы (именно так я сделал).

    • Необходимо правильно создать таблицу HBase. По умолчанию она создается с одним Region-ом. Это приводит к тому, что создается только один редьюсер и запись идет только на одну ноду, загружая её на 100% остальные при этом курят.
      Поэтому при создании новой таблицы надо сделать pre-split. В моём случае таблица разбивалась на 10 Region-ов равномерно разбросанных по всему кластеру.


    //Создаём таблицу и делаем пре-сплит
    HTableDescriptor descriptor = new HTableDescriptor(
            Bytes.toBytes(tableName)
    );
    
    descriptor.addFamily(
            new HColumnDescriptor(Constants.COLUMN_FAMILY_NAME)
    );
    
    HBaseAdmin admin = new HBaseAdmin(config);
    
    byte[] startKey = new byte[16];
    Arrays.fill(startKey, (byte) 0);
    
    byte[] endKey = new byte[16];
    Arrays.fill(endKey, (byte)255);
    
    admin.createTable(descriptor, startKey, endKey, REGIONS_COUNT);
    admin.close();
    


    • MapReduce Job пишет в выходную директорию, которую мы ему указываем, но при этом создает субдиректории, одноименные с column family. Файлы создаются именно там


    В целом, это всё. Хочется сказать, что это довольно грубый тест, без хитрых оптимизаций, поэтому если у вас есть что добавить, буду рад услышать.

    Весь код проекта доступен на GitHub: github.com/2anikulin/hbase-bulk-load
    Поделиться публикацией

    Похожие публикации

    Комментарии 5

      0
      bulkload, по сути своей — перемещение HFile-ов из входной директории в хранилище HBase (+ инициализация новых файлов, + проверка на то, что не изменились границы регионов, пока эти HFile-ы создавались). Это во много раз быстрее, многочисленных Put-ов…

      Единственное, надо иметь ввиду, что при загрузке данных сразу в несколько семейств колонок, происходит попытка заблокировать очередной регион на запись. В случае, если идут какие-то другие опреации (например какая-то MR-задача читает данные из HBase или идёт процесс компакшена), попытка получить write lock может затянуться.

      Время, которое тратится на формирование HFile-ов из сходных CSV файлов (по сути, время MR-задачи) можно попытаться оптимизировать настройками.
        0
        bulkload, по сути своей — перемещение HFile-ов из входной директории в хранилище HBase

        Это верно.
        Но файлы всёравно подготавливать надо. Мне было необходимо понять, на сколько это продуктивно работает с сырыми данными…
          0
          У bluck load есть одна проблема, если произошел split одного из регионов, то часть данных может не загрузится. Это происходит потому, что то HFiles формируются по заранее известным регионам.
            0
            Там есть проверка на то, что границы регионов могли измениться, в т.ч. за счёт сплита.
              0
              Возможно в новой версии появились, это было два года назад.

            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

            Самое читаемое