Комментарии 9
Изменим список, возвращаемый функцией сканирования каталога хранения файлов с List на ConcurrentQueue.
А почему бы не использовать итератор, который по умолчанию ещё и является потокобезопасным для чтения.
getDataFile – условная функция возвращающая содержимое файла в виде некоторой структуры данных;
Загрузим мы 8 гигов в оперативу и сломаем компьютер. Тут нужно искать tradeoff между скоростью и оперативкой. Для этого можно было бы возвращать буферезированный поток, а не полный массив.
Очевидно, что самые затратные по времени операции – это считывание данных с жесткого диска в память и запись данных из памяти в БД.
Поэтому мы параллельно считываем? От этого скорость только упадёт из-за накладных расходов по сравнению с последовательным чтением.
Добрый день, спасибо за комментарий.
1. А почему бы не использовать итератор, который по умолчанию ещё и является потокобезопасным для чтения.
Выдержка с сайта Microsoft:
“Все открытые и защищенные члены ConcurrentQueue являются потокобезопасными и могут использоваться одновременно из нескольких потоков.”
2. Загрузим мы 8 гигов в оперативу и сломаем компьютер. Тут нужно искать tradeoff между скоростью и оперативкой. Для этого можно было бы возвращать буферезированный поток, а не полный массив.
Да, такая возможность существует. Но в статье не приводится универсальный код, а взят пример из реальной задачи, в которой есть знание максимально возможного размера обрабатываемого файла (~1 мегабайт) и знание того, что среда выполнения .Net никогда не выделит столько потоков под конструкцию Parallel.Foreach (или Parallel.For), что бы забить полностью доступную нам оперативную память.
Для примера: если предположить, что нам доступны 6 гигабайт, то 6*1024 потоков единовременно или в течении краткого периода времени никогда выделено не будет. Таким образом, вероятность того что мы загрузим ОЗУ полностью стремится к нулю. И даже если предположить, что всё-таки машина сможет выделить столько потоков то компьютер мы не сломаем. Просто приложение будет закрыто с ошибкой переполнения памяти (хотя надо признать, что в этот момент действительно возможны будут замедления работы других процессов).
3. Поэтому мы параллельно считываем? От этого скорость только упадёт из-за накладных расходов по сравнению с последовательным чтением.
Да именно поэтому. Упадет скорость или нет – это достаточно спорный вопрос, дискуссии по которому шли и периодически до сих пор вспыхивают в сети. Падение скорости более вероятно на классических HDD, если же применяются SSD диски, то соответственно эта вероятность будет существенно ниже. Конкретно в нашем случае – было ускорение обработки файлов в несколько раз. Возможно заменить его на параллельную запись в БД, конструкция останется той же самой.
1. А почему бы не использовать итератор, который по умолчанию ещё и является потокобезопасным для чтения.
Выдержка с сайта Microsoft:
“Все открытые и защищенные члены ConcurrentQueue являются потокобезопасными и могут использоваться одновременно из нескольких потоков.”
2. Загрузим мы 8 гигов в оперативу и сломаем компьютер. Тут нужно искать tradeoff между скоростью и оперативкой. Для этого можно было бы возвращать буферезированный поток, а не полный массив.
Да, такая возможность существует. Но в статье не приводится универсальный код, а взят пример из реальной задачи, в которой есть знание максимально возможного размера обрабатываемого файла (~1 мегабайт) и знание того, что среда выполнения .Net никогда не выделит столько потоков под конструкцию Parallel.Foreach (или Parallel.For), что бы забить полностью доступную нам оперативную память.
Для примера: если предположить, что нам доступны 6 гигабайт, то 6*1024 потоков единовременно или в течении краткого периода времени никогда выделено не будет. Таким образом, вероятность того что мы загрузим ОЗУ полностью стремится к нулю. И даже если предположить, что всё-таки машина сможет выделить столько потоков то компьютер мы не сломаем. Просто приложение будет закрыто с ошибкой переполнения памяти (хотя надо признать, что в этот момент действительно возможны будут замедления работы других процессов).
3. Поэтому мы параллельно считываем? От этого скорость только упадёт из-за накладных расходов по сравнению с последовательным чтением.
Да именно поэтому. Упадет скорость или нет – это достаточно спорный вопрос, дискуссии по которому шли и периодически до сих пор вспыхивают в сети. Падение скорости более вероятно на классических HDD, если же применяются SSD диски, то соответственно эта вероятность будет существенно ниже. Конкретно в нашем случае – было ускорение обработки файлов в несколько раз. Возможно заменить его на параллельную запись в БД, конструкция останется той же самой.
Э-э-э…
Существует понятие BatchMachine. Правда, оно лежит несколько за пределами «фреймворков» и прочих модных слов. Это базовые вещи.
Суть такова — есть задача — master, которая (в нашем случае) читает с диска файлы и готовит задания на их обработку для задач — worker'ов (отработчиков). Каналы связи между Мастером и обработчиками лучше выбирать такие, которые не требуют отслеживания блокировок на уровне прикладной задачи. Тут уже надо смотреть что есть на платформе. По Win есть неплохая вещь MailSlot. В никсах — UnixSocket (очень близкий аналог). В AS/400 (под которой сейчас работаю) совсем чудесно — есть системные объекты *USRQ и *DTAQ (User Queue и Data Queue соответственно). Ну или смотреть наличие на платформе какой-нибудь MQ.
На худой конец — обычные UDP сокеты в рамках локальной машины.
В этой схеме мастер взял данные, подготовил пакет для обработки, выложил в очередь. Обработчик подхватил пакет, обработал, дальше уже или сам сохранил результат, или отдал мастеру.
Мастер один, обработчиков много. За степенью заполнения очереди пакетов следит мастер. Если он формирует пакеты быстрее чем обработчики их обрабатывают — думать. Может можно увеличить количество обработчиков.
Как-то таким образом делал обработку порядка 30млн записей в таблице. Обработка была достаточно сложной — одна запись тащила за собой еще десяток других из других таблиц.
Был один мастер, который выбирал записи из основной таблице и складывал данные в выходную очередь (*USRQ). Дальше 10 обработчиков, каждый из которых читал очередную порцию данных, обрабатывал ее и складывал отчет во входную очередь мастера (т.е. было две очереди). Там уже мастер брал отчет и заносил в протокол.
Поскольку *USRQ есть системный объект, то ни за какие блокировки голова не болела — конкурентный доступ к нему обеспечивается системой.
Аналог можно делать на UnixSockets — у каждого процесса (мастера, обработчика) есть свой DGRAM сокет. Мастер читает данные и смотрит кто из обработчиков свободен. Тому и кидает данные в сокет. Результаты обработчики складывают в сокет мастера. Тут мастеру еще надо статусы обработчиков хранить — отдал данные — статус «занят». Получил результат — статус «свободен».
И тоже никаких проблем с блокировками.
Все эти технологии обкатаны очень давно, еще когда занимался разработкой распределенной системы мониторинга инженерного обрудования зданий на микроядреной архитектуре. Там в ядре было несколько потоков, в частности, поток, отвечающий за работу с удаленными микроконтроллерами и поток отвечающий за работу с удаленными интерфейсными клиентами. И надо было заниматься фильтрацией-маршрутизацией информации контроллеры-клиенты.
Начитан со всяких конкурентных очередей, но быстро с этим наелся. Ушел на датаграммный обмен через сокеты. У каждого потока свой сокет в который любой другой поток может передать информацию в виде датаграммы. И никаких проблем с блокировками.
Существует понятие BatchMachine. Правда, оно лежит несколько за пределами «фреймворков» и прочих модных слов. Это базовые вещи.
Суть такова — есть задача — master, которая (в нашем случае) читает с диска файлы и готовит задания на их обработку для задач — worker'ов (отработчиков). Каналы связи между Мастером и обработчиками лучше выбирать такие, которые не требуют отслеживания блокировок на уровне прикладной задачи. Тут уже надо смотреть что есть на платформе. По Win есть неплохая вещь MailSlot. В никсах — UnixSocket (очень близкий аналог). В AS/400 (под которой сейчас работаю) совсем чудесно — есть системные объекты *USRQ и *DTAQ (User Queue и Data Queue соответственно). Ну или смотреть наличие на платформе какой-нибудь MQ.
На худой конец — обычные UDP сокеты в рамках локальной машины.
В этой схеме мастер взял данные, подготовил пакет для обработки, выложил в очередь. Обработчик подхватил пакет, обработал, дальше уже или сам сохранил результат, или отдал мастеру.
Мастер один, обработчиков много. За степенью заполнения очереди пакетов следит мастер. Если он формирует пакеты быстрее чем обработчики их обрабатывают — думать. Может можно увеличить количество обработчиков.
Как-то таким образом делал обработку порядка 30млн записей в таблице. Обработка была достаточно сложной — одна запись тащила за собой еще десяток других из других таблиц.
Был один мастер, который выбирал записи из основной таблице и складывал данные в выходную очередь (*USRQ). Дальше 10 обработчиков, каждый из которых читал очередную порцию данных, обрабатывал ее и складывал отчет во входную очередь мастера (т.е. было две очереди). Там уже мастер брал отчет и заносил в протокол.
Поскольку *USRQ есть системный объект, то ни за какие блокировки голова не болела — конкурентный доступ к нему обеспечивается системой.
Аналог можно делать на UnixSockets — у каждого процесса (мастера, обработчика) есть свой DGRAM сокет. Мастер читает данные и смотрит кто из обработчиков свободен. Тому и кидает данные в сокет. Результаты обработчики складывают в сокет мастера. Тут мастеру еще надо статусы обработчиков хранить — отдал данные — статус «занят». Получил результат — статус «свободен».
И тоже никаких проблем с блокировками.
Все эти технологии обкатаны очень давно, еще когда занимался разработкой распределенной системы мониторинга инженерного обрудования зданий на микроядреной архитектуре. Там в ядре было несколько потоков, в частности, поток, отвечающий за работу с удаленными микроконтроллерами и поток отвечающий за работу с удаленными интерфейсными клиентами. И надо было заниматься фильтрацией-маршрутизацией информации контроллеры-клиенты.
Начитан со всяких конкурентных очередей, но быстро с этим наелся. Ушел на датаграммный обмен через сокеты. У каждого потока свой сокет в который любой другой поток может передать информацию в виде датаграммы. И никаких проблем с блокировками.
Вы, я так понимаю, про TPL Dataflow не слышали? А про async IO?
Тема параллелизма данных норм описана в четвертой главе
habr.com/ru/company/piter/blog/497218
Как новичку мне было тяжело ориентироваться в огромной куче статей (в том числе и на хабре) на тему асинхронности и параллелизма которых много наплодили за последнее время. Каждый автор интерпретирует это по своему от того образуется каша в голове. Лучше один раз прочитать авторитетного дядьку и его книгу «Конкурентность в C#. Асинхронное, параллельное и многопоточное программирование. 2-е межд. изд.» и закрыть этот вопрос.
habr.com/ru/company/piter/blog/497218
Как новичку мне было тяжело ориентироваться в огромной куче статей (в том числе и на хабре) на тему асинхронности и параллелизма которых много наплодили за последнее время. Каждый автор интерпретирует это по своему от того образуется каша в голове. Лучше один раз прочитать авторитетного дядьку и его книгу «Конкурентность в C#. Асинхронное, параллельное и многопоточное программирование. 2-е межд. изд.» и закрыть этот вопрос.
Чтобы не возникало каши в голове, нужно понимать как это работает внутри. Иначе можно легко превратиться в обезьяну, у который выработали условные рефлексы — «при загорании зеленой лампочки жать зеленую кнопку, при загорании красной — красную».
Т.е. вы будете наизусть знать набор готовых методов конкретного фреймворка, но за его пределами абсолютно не понимать сути того, как это работает.
Т.е. вы будете наизусть знать набор готовых методов конкретного фреймворка, но за его пределами абсолютно не понимать сути того, как это работает.
Parallel.ForEach — это слегка устаревшая тема. Сейчас TPL Dataflow или там Channels для обмена данными, и еще async io.
И, насколько я помню, Parallel.ForEach предназначен для CPU bound операций, а не для IO bound. Это еще даже у Рихтера написано в книге, изданной более 7 лет назад.
И, насколько я помню, Parallel.ForEach предназначен для CPU bound операций, а не для IO bound. Это еще даже у Рихтера написано в книге, изданной более 7 лет назад.
Зарегистрируйтесь на Хабре, чтобы оставить комментарий
Использование распараллеливания при обработке данных в C#