Использование распараллеливания при обработке данных в C#



    Всем добрый день! Я технический специалист, работающий в системе внутреннего аудита, в мои обязанности входит создание инструментов ETL на языке программирования C#.

    Периодически источниками данных становятся жестко структурированные файлы формата xml, csv, json или любого другого формата. Иногда их количество становится достаточно большим и постоянно увеличивающимся. Например, в одной из моих задач количество файлов увеличивалось со средней скоростью обновления примерно 150 000 файлов в сутки. Если при этом обработка одного файла (считывание массива байт с жесткого диска в память, трансформация загруженных данных и запись их в базу данных) занимает секунду, то становится понятно, что обработка всех файлов займет более 40 часов. В этом случае мы не сможем обработать эти файлы до конца, так как скорость увеличения количества файлов будет явно выше скорости их обработки.

    Одно из решений данной проблемы – разработать приложение, в котором создать пул независимых друг от друга потоков. В потоках будут обрабатываться файлы, выбираемые из общей очереди. Однако в этом случае возникают сложности с синхронизацией работающих потоков и совместного использования ресурсов, так как очень вероятно появление взаимных блокировок.

    Что бы избежать этих сложностей компания Microsoft добавила в фреймоворк .Net библиотеку TPL (начиная с версии 4.0). Я расскажу, как используя возможности этой библиотеки решить данную проблему.

    Итак, изначально алгоритм работы выглядит следующим образом:

    Сканируется каталог хранения файлов и возвращается список (например, List), содержащий данные о всех файлах;
    Запускается цикл (for или foreach) в котором данные из очередного файла считываются в память, при необходимости трансформируются и записываются в БД.

    Очевидно, что самые затратные по времени операции – это считывание данных с жесткого диска в память и запись данных из памяти в БД.

    Попробуем оптимизировать наш алгоритм при помощи библиотеки TPL:

    Пункт 1.

    Изменим список, возвращаемый функцией сканирования каталога хранения файлов с List на ConcurrentQueue.
    Для чего мы это делаем? Дело в том, что класс ConcurrentQueue является потокобезопасным, то есть если одновременно два потока попытаются извлечь данные из этого списка или записать в него данные, то у нас не возникнет исключений (Exception).
    Пункт 1 нашего алгоритма будет выглядеть так: сканируется каталог хранения файлов и возвращается список ConcurrentQueue, содержащий данные о всех файлах.

    Пункт 2:
    Изменим конструкцию формирующую цикл обработки данных из файла. Заменим for на Parallel.For или Parallel.ForEach.

    В чем отличие новой конструкции от for? Тут всё просто и в принципе понятно из названия языковой конструкции. Все итерации цикла выполняются в параллельных потоках. В качестве примера я покажу организацию цикла конструкцией Parallel.ForEach:

    Parallel.ForEach(listFiles, (currentFile) =>
           	  {
                  	var dataFile = getDataFile(currentFile.FullName);
    		TransformData(dataFile);
    		WriteToDB(dataFile);
                   });

    где:

    listFiles – это коллекция типа ConcurrentQueue содержащая спи-сок файлов в каталоге;
    currentFile – элемент коллекции listFiles, который возвращается конструк-цией ForEach;
    dataFile – условная некоторая структура данных в памяти, получаемая счи-тыванием содержимого файла в память;
    getDataFile – условная функция возвращающая содержимое файла в виде некоторой структуры данных;
    TransformData – условная процедура трансформации полученных данных;
    WriteToDB – условная процедура записи данных в БД.

    В данном примере, с помощью конструкции Parallel.ForEach, мы организуем цикл. В этом цикле, в параллельных потоках, производится считывание данных с жесткого диска, их трансформация и запись в БД. При этом, проблемы организации работы параллельных потоков отсутствуют. Количество параллельных потоков зависит от числа ядер процессора и их загруженности.

    Используя предложенный алгоритм, мы ускорим обработку файлов, как минимум в 2 раза. Хотя, конечно, эта цифра будет меняться в зависимости от количества ядер и памяти машины, на которой будет запускаться программа.

    Так же для ускорения работы программы нужно вынести запись в БД в отдельный поток, работающий независимо от основного. Сделать это можно при помощи коллекции ConcurrentQueue, чтобы избежать конфликтов при добавлении данных в оче-редь.

    Перепишем вышеприведенный пример с учетом оптимизации записи в БД.
    Предположим, что процедура чтения файлов возвращает нам данные в DataTable):

    Parallel.ForEach(listFiles, (currentFile) =>
           	  {
                  	DataTable dataFile = getDataFile(currentFile.FullName);
    		TransformData(dataFile);
    		threadWriteToDB.ListData.Enqueue(dataFile);
                   });
    

    Как видно, вместо строки с вызовом процедуры записи в БД, мы просто добавляем в коллекцию ConcurrentQueue ListData описанную и инициализированную в отдельном потоке, экземпляр которого threadWriteToDB используется в нашем цикле.

    Запись в БД происходит уже в отдельном потоке. Запись в БД можно организовать аналогично работе с файлами, с помощью конструкций Parallel.For и/или Paral-lel.Foreach.

    В моей задаче, где потребовалась обработка сопоставимого количества файлов, сейчас может обрабатываться в среднем от 200 000 до 400 000 файлов в сутки, при чем скорость ограничивается загрузкой БД и шириной канала данных.

    Похожие публикации

    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 9

      0
      Изменим список, возвращаемый функцией сканирования каталога хранения файлов с List на ConcurrentQueue.

      А почему бы не использовать итератор, который по умолчанию ещё и является потокобезопасным для чтения.

      getDataFile – условная функция возвращающая содержимое файла в виде некоторой структуры данных;

      Загрузим мы 8 гигов в оперативу и сломаем компьютер. Тут нужно искать tradeoff между скоростью и оперативкой. Для этого можно было бы возвращать буферезированный поток, а не полный массив.

      Очевидно, что самые затратные по времени операции – это считывание данных с жесткого диска в память и запись данных из памяти в БД.

      Поэтому мы параллельно считываем? От этого скорость только упадёт из-за накладных расходов по сравнению с последовательным чтением.
        –2
        Добрый день, спасибо за комментарий.

        1. А почему бы не использовать итератор, который по умолчанию ещё и является потокобезопасным для чтения.

        Выдержка с сайта Microsoft:
        “Все открытые и защищенные члены ConcurrentQueue являются потокобезопасными и могут использоваться одновременно из нескольких потоков.”

        2. Загрузим мы 8 гигов в оперативу и сломаем компьютер. Тут нужно искать tradeoff между скоростью и оперативкой. Для этого можно было бы возвращать буферезированный поток, а не полный массив.

        Да, такая возможность существует. Но в статье не приводится универсальный код, а взят пример из реальной задачи, в которой есть знание максимально возможного размера обрабатываемого файла (~1 мегабайт) и знание того, что среда выполнения .Net никогда не выделит столько потоков под конструкцию Parallel.Foreach (или Parallel.For), что бы забить полностью доступную нам оперативную память.
        Для примера: если предположить, что нам доступны 6 гигабайт, то 6*1024 потоков единовременно или в течении краткого периода времени никогда выделено не будет. Таким образом, вероятность того что мы загрузим ОЗУ полностью стремится к нулю. И даже если предположить, что всё-таки машина сможет выделить столько потоков то компьютер мы не сломаем. Просто приложение будет закрыто с ошибкой переполнения памяти (хотя надо признать, что в этот момент действительно возможны будут замедления работы других процессов).

        3. Поэтому мы параллельно считываем? От этого скорость только упадёт из-за накладных расходов по сравнению с последовательным чтением.

        Да именно поэтому. Упадет скорость или нет – это достаточно спорный вопрос, дискуссии по которому шли и периодически до сих пор вспыхивают в сети. Падение скорости более вероятно на классических HDD, если же применяются SSD диски, то соответственно эта вероятность будет существенно ниже. Конкретно в нашем случае – было ускорение обработки файлов в несколько раз. Возможно заменить его на параллельную запись в БД, конструкция останется той же самой.
          0
          А как вы контролируете потоки? Ну вот какая-то нештатная ситуация и поток встал. Или зациклился. Что будет в этом случае? Как вы реагируете на такое?

        +1
        Э-э-э…

        Существует понятие BatchMachine. Правда, оно лежит несколько за пределами «фреймворков» и прочих модных слов. Это базовые вещи.

        Суть такова — есть задача — master, которая (в нашем случае) читает с диска файлы и готовит задания на их обработку для задач — worker'ов (отработчиков). Каналы связи между Мастером и обработчиками лучше выбирать такие, которые не требуют отслеживания блокировок на уровне прикладной задачи. Тут уже надо смотреть что есть на платформе. По Win есть неплохая вещь MailSlot. В никсах — UnixSocket (очень близкий аналог). В AS/400 (под которой сейчас работаю) совсем чудесно — есть системные объекты *USRQ и *DTAQ (User Queue и Data Queue соответственно). Ну или смотреть наличие на платформе какой-нибудь MQ.
        На худой конец — обычные UDP сокеты в рамках локальной машины.

        В этой схеме мастер взял данные, подготовил пакет для обработки, выложил в очередь. Обработчик подхватил пакет, обработал, дальше уже или сам сохранил результат, или отдал мастеру.

        Мастер один, обработчиков много. За степенью заполнения очереди пакетов следит мастер. Если он формирует пакеты быстрее чем обработчики их обрабатывают — думать. Может можно увеличить количество обработчиков.

        Как-то таким образом делал обработку порядка 30млн записей в таблице. Обработка была достаточно сложной — одна запись тащила за собой еще десяток других из других таблиц.

        Был один мастер, который выбирал записи из основной таблице и складывал данные в выходную очередь (*USRQ). Дальше 10 обработчиков, каждый из которых читал очередную порцию данных, обрабатывал ее и складывал отчет во входную очередь мастера (т.е. было две очереди). Там уже мастер брал отчет и заносил в протокол.

        Поскольку *USRQ есть системный объект, то ни за какие блокировки голова не болела — конкурентный доступ к нему обеспечивается системой.

        Аналог можно делать на UnixSockets — у каждого процесса (мастера, обработчика) есть свой DGRAM сокет. Мастер читает данные и смотрит кто из обработчиков свободен. Тому и кидает данные в сокет. Результаты обработчики складывают в сокет мастера. Тут мастеру еще надо статусы обработчиков хранить — отдал данные — статус «занят». Получил результат — статус «свободен».

        И тоже никаких проблем с блокировками.

        Все эти технологии обкатаны очень давно, еще когда занимался разработкой распределенной системы мониторинга инженерного обрудования зданий на микроядреной архитектуре. Там в ядре было несколько потоков, в частности, поток, отвечающий за работу с удаленными микроконтроллерами и поток отвечающий за работу с удаленными интерфейсными клиентами. И надо было заниматься фильтрацией-маршрутизацией информации контроллеры-клиенты.

        Начитан со всяких конкурентных очередей, но быстро с этим наелся. Ушел на датаграммный обмен через сокеты. У каждого потока свой сокет в который любой другой поток может передать информацию в виде датаграммы. И никаких проблем с блокировками.
          +2

          Вы, я так понимаю, про TPL Dataflow не слышали? А про async IO?

            0
            TPL Dataflow мощная вещь
            +1
            Тема параллелизма данных норм описана в четвертой главе
            habr.com/ru/company/piter/blog/497218

            Как новичку мне было тяжело ориентироваться в огромной куче статей (в том числе и на хабре) на тему асинхронности и параллелизма которых много наплодили за последнее время. Каждый автор интерпретирует это по своему от того образуется каша в голове. Лучше один раз прочитать авторитетного дядьку и его книгу «Конкурентность в C#. Асинхронное, параллельное и многопоточное программирование. 2-е межд. изд.» и закрыть этот вопрос.
              0
              Чтобы не возникало каши в голове, нужно понимать как это работает внутри. Иначе можно легко превратиться в обезьяну, у который выработали условные рефлексы — «при загорании зеленой лампочки жать зеленую кнопку, при загорании красной — красную».

              Т.е. вы будете наизусть знать набор готовых методов конкретного фреймворка, но за его пределами абсолютно не понимать сути того, как это работает.
              +3
              Parallel.ForEach — это слегка устаревшая тема. Сейчас TPL Dataflow или там Channels для обмена данными, и еще async io.

              И, насколько я помню, Parallel.ForEach предназначен для CPU bound операций, а не для IO bound. Это еще даже у Рихтера написано в книге, изданной более 7 лет назад.

              Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

              Самое читаемое