Comments 29
Вы рассматривали msdn.microsoft.com/en-us/devlabs/gg585582.aspx (TPL Dataflow)?
И чем стандартные варианты типа Parallel.For с собственным партицированием и енумератором не подошли?
И чем стандартные варианты типа Parallel.For с собственным партицированием и енумератором не подошли?
TPL не рассматривал — побежал читать по этого зверя.
Parallel.For, насколько я себе его понимаю, брёт коллекцию, нарезает и выполняет одно и то же одно действие (метод) для всех элементов, просто в нескольких потоках. И ускорение он даёт именно за счёт того, что данные уже есть и можно их резать. В моём же случае данные капают из тоненького крана и нужны не одному методу, а сотне. Собственно, решение относится не только и не столько к вопросам ускорения — допускаю, что использование нативной реализации могло дать ещё больший выигрыш в скорости — мне было интересно сделать удобное с точки зрения архитектуры решение по параллельной синхронной обработке данных.
Parallel.For, насколько я себе его понимаю, брёт коллекцию, нарезает и выполняет одно и то же одно действие (метод) для всех элементов, просто в нескольких потоках. И ускорение он даёт именно за счёт того, что данные уже есть и можно их резать. В моём же случае данные капают из тоненького крана и нужны не одному методу, а сотне. Собственно, решение относится не только и не столько к вопросам ускорения — допускаю, что использование нативной реализации могло дать ещё больший выигрыш в скорости — мне было интересно сделать удобное с точки зрения архитектуры решение по параллельной синхронной обработке данных.
Я думаю, это может быть интересно (лично я прочитал прям на одном дыхании):
http://www.microsoft.com/en-us/download/details.aspx?id=19222
http://www.microsoft.com/en-us/download/details.aspx?id=19222
Насколько я знаю, Parallel.For не требуется знать общее количество элементов. Он выделяет по Task'у на каждый элемент. Когда кончаются свободные таски, он ждет завершения начатых, а когда элементы — то возвращает результат. Уверены, что к вашей задаче это не подходит?
Изначально задача решалась вообще в цикле. Если постараться, её можно запихнуть практически в любую параллельную конструкцию. Однако в таком случае для меня Task-ом станет добавление информации для 100+ срезов. То есть этот Task будет выглядеть как бегемот. А разделение коллекции на клонов позволяет мне работать с клоном как с оригинальной коллекцией. Получилось просто красивее и удобнее при поддержке кода.
Хотя в статье упоминается PLinq, я всеже все равно нt совсем поняk, чем не подошел для данной задачи например .AsParallel() или какие-нибудь коллекции из числа Concurrent?
AsParallel() и Concurrent вообще — это, если я правильно понимаю, подход к обезапашиванию коллекций от конкуренции при чтении из разных потоков. То есть, условно, гарантировать, что из [1, 2, 3, 4, 5] не будет прочитано [1, 2, 2, 3, 4, 5, 5]. При этом, если читателей 2, то один получит [1, 2, 4], а другой — [3, 5]. Передо мною же, в данном случае стоит задачка другого рода: дать возможность большому числу читателей КАЖДОМУ получить по своей порции [1, 2, 3, 4, 5].
Значит я не правильно понял задачу, извиняюсь.
Возможно это как раз тот самый случай, когда был смысл использовать вместо Enumerable, Observable + RX, как вы думаете?
Возможно это как раз тот самый случай, когда был смысл использовать вместо Enumerable, Observable + RX, как вы думаете?
Возможно, я некорректно выразился про «Concurrent вообще». Речь о том, что задач у меня много. Я могу упаковать их пучком и при появлении объекта дёргать на выполнение, а могу задачу семантически выделить как метод
Foo(IEnumerable<TSomeType> param)
. Это не то, чтобы лучше или хуже. Но так я, например, имею возможность ипользовать конструкции типа param.Where().OfType<TA>()
. Это по-другому, чем при использовании Task<TSomeType>
. Если я упустил возможность сделать так с использованием PLinq, то было бы здорово, если бы Вы подсказали, как именно.Если вам важно сохранить ту же последовательность, то для этого добавлятся еще один параметр .AsOrdered()
Писал про TPL Dataflow небольшой пост Использование TPL Dataflow для многопоточной компрессии файлов.
Для клонирования есть
Выглядеть это будет как-то так:
Блоков вида
Последний
Для клонирования есть
BroadcastBlock<T>
.Выглядеть это будет как-то так:
Читаем -> BufferBlock<TInput> -> BroadcastBlock<TInput> -> TransformBlock<TInput,TOutput> -> ActionBlock<TOutput>
.Блоков вида
-> TransformBlock<TInput,TOutput> -> ActionBlock<TOutput>
у Вас будет 100+.Последний
ActionBlock<TOutput>
нужен для сохранения результатов (не понял куда Вы их пишете).Если я правильно понял идею, Вы предлагаете использовать ETL-функциональность (что-то похожее я когда-то делал с помощью Rhino.ETL). Идея правильная, согласен. Только она всё равно позволяет мне работать с конкретным очередным экземпляром MyType из коллекции во всех срезах одновременно, а не с представлением IEnumerable<MyType>.
Можно клонировать
Библиотека позволяет убрать все эти синхронизации, которые ужасают своим видом не подготовленного читателя =)
IEnumerable<MyType>
, если Вам нужно именно это. Как при этом память себя чувствует?Библиотека позволяет убрать все эти синхронизации, которые ужасают своим видом не подготовленного читателя =)
Есть такая очень хорошая вещь — Reactive Extensions.
Там смотрите по ключевым словам «shared observable» (это чтобы создать поток элементов для нескольких подписчиков): вызов Publish/Connect, в т.ч. тут можно почитать http://leecampbell.blogspot.ru/2010/08/rx-part-7-hot-and-cold-observables.html.
После создания расшареного потока просто навешиваете на него нужное число подписчиков/обработчиков.
Касательно параллелизации посмотрите SubscribeOn/ObserveOn, например на http://leecampbell.blogspot.ru/2010/06/rx-part-6-scheduling-and-threading.html.
Собственно дальше изучайте остальные возможности и делайте еще более клевые штуки в пару call'ов.
Там смотрите по ключевым словам «shared observable» (это чтобы создать поток элементов для нескольких подписчиков): вызов Publish/Connect, в т.ч. тут можно почитать http://leecampbell.blogspot.ru/2010/08/rx-part-7-hot-and-cold-observables.html.
После создания расшареного потока просто навешиваете на него нужное число подписчиков/обработчиков.
Касательно параллелизации посмотрите SubscribeOn/ObserveOn, например на http://leecampbell.blogspot.ru/2010/06/rx-part-6-scheduling-and-threading.html.
Собственно дальше изучайте остальные возможности и делайте еще более клевые штуки в пару call'ов.
Но так я, например, имею возможность ипользовать конструкции типа param.Where().OfType<TA>().
По поводу этого: перед созданием каждого подписчика (вызов Subscribe) можете навешивать индивидуально разной лапши, те же Where, подписчику элемент попадает уже в виде TSomeType, но там его тоже можно обернуть в Observable и продолжить заниматься извращениями.
Я понимаю, что я не в кассу
Вы не пытались попробовать BlockingCollection с его методами TryAddAny() / TryTakeAny(). Накидали бы себе сколько угодно коллекций для исходных данных, «размазывая» по ним прочитанные данные, и сколько угодно потоков-обработчиков. Хоть 1000, которые могут запускаться вообще на других физических компьютерах (а данные хоть через WCF получать или Remoting). Я конечно не знаю всех особенностей задачи, но сложность кода ядра вашей задачи сократилась бы сильно ИМХО.
Это если вы не рассматривали такой вариант, если рассматривали, то почему отказались?
Нужно было сделать так, чтобы на каждом enumerable.Next() потокам раздавался на чтение единственный экземпляр объекта. Такое решение имеет особенность (это скорее минус, но не страшный): все потоки будут ждать самого медленного брата, то есть чтение будет синхронным. Если честно, я очень надеялся, что кто-то уже за меня это написал.
Вы не пытались попробовать BlockingCollection с его методами TryAddAny() / TryTakeAny(). Накидали бы себе сколько угодно коллекций для исходных данных, «размазывая» по ним прочитанные данные, и сколько угодно потоков-обработчиков. Хоть 1000, которые могут запускаться вообще на других физических компьютерах (а данные хоть через WCF получать или Remoting). Я конечно не знаю всех особенностей задачи, но сложность кода ядра вашей задачи сократилась бы сильно ИМХО.
Это если вы не рассматривали такой вариант, если рассматривали, то почему отказались?
Точнее AddToAny() / TryTakeFromAny(). Во-первых запамятовал правильные названия методов, во-вторых пропусков при добавлении у вас не должно оказаться, без Try.
Спасибо за интересную идею, попробую на досуге придумать, как из неё сделать то же, что у меня получилось в итоге. Почти уверен, что выйдет короче.
TryAddAny/TryTakeAny не подойдет, поскольку будет пытаться читать и писать в одну из коллекций, но идея правильная. Все что надо сделать — это создать по одной BlockingCollection на срез, используя ConcurrentQueue с ограниченным количеством элементов как внутреннее хранилище, и при чтении исходного IEnumerable просто в цикле добавлять новый кортеж в каждую BlockingCollection. В итоге получим: все читатели будут работать со своей очередью независимо друг от друга, тем самым сглаживая флуктуации по времени обработки в срезах. Поставщик данных будет задерживаться на самом медленном читателе только если у того переполнится очередь.
Неверно. Будет пытаться писать в Any из любых незаблоченных чтением коллекций, и наоборот. Проверено. ConcurrentQueue работает уже на уровне ниже, где не сумма коллекций работает, и это уже велосипед из запроса.
Что именно неверно? TryAddToAny добавляет элемент в одну из коллекций, и возвращает индекс коллекции, в которую был добавлен элемент. TryTakeFromAny извлекает элемент из одной из коллекций, и так же возвращает индекс коллеции.
Таким образом элемент будет присутствовать только в одной из коллекций, только один из читателей сможет его прочитать.
Насчет ConcurrentQueue — так и есть, она работает уровнем ниже (BlockingCollection использует ConcurrentQueue или другую IProducerConsumerCollection<T> коллекцию как внутреннее хранилище элементов). Я имел в виду что для ограничения количества элементов, которые находятся одновременно в обработке у всех читателей можно установить лимит на количество элементов в очереди с помощью конструктора BlockingCollection<T>(int boundedCapacity).
Таким образом элемент будет присутствовать только в одной из коллекций, только один из читателей сможет его прочитать.
Насчет ConcurrentQueue — так и есть, она работает уровнем ниже (BlockingCollection использует ConcurrentQueue или другую IProducerConsumerCollection<T> коллекцию как внутреннее хранилище элементов). Я имел в виду что для ограничения количества элементов, которые находятся одновременно в обработке у всех читателей можно установить лимит на количество элементов в очереди с помощью конструктора BlockingCollection<T>(int boundedCapacity).
Sign up to leave a comment.
Параллельная обработка IEnumerable в .NET