Pull to refresh

Comments 29

TPL не рассматривал — побежал читать по этого зверя.
Parallel.For, насколько я себе его понимаю, брёт коллекцию, нарезает и выполняет одно и то же одно действие (метод) для всех элементов, просто в нескольких потоках. И ускорение он даёт именно за счёт того, что данные уже есть и можно их резать. В моём же случае данные капают из тоненького крана и нужны не одному методу, а сотне. Собственно, решение относится не только и не столько к вопросам ускорения — допускаю, что использование нативной реализации могло дать ещё больший выигрыш в скорости — мне было интересно сделать удобное с точки зрения архитектуры решение по параллельной синхронной обработке данных.
Эх, только хотел предложить почитать =)
Насколько я знаю, Parallel.For не требуется знать общее количество элементов. Он выделяет по Task'у на каждый элемент. Когда кончаются свободные таски, он ждет завершения начатых, а когда элементы — то возвращает результат. Уверены, что к вашей задаче это не подходит?
Изначально задача решалась вообще в цикле. Если постараться, её можно запихнуть практически в любую параллельную конструкцию. Однако в таком случае для меня Task-ом станет добавление информации для 100+ срезов. То есть этот Task будет выглядеть как бегемот. А разделение коллекции на клонов позволяет мне работать с клоном как с оригинальной коллекцией. Получилось просто красивее и удобнее при поддержке кода.
Хотя в статье упоминается PLinq, я всеже все равно нt совсем поняk, чем не подошел для данной задачи например .AsParallel() или какие-нибудь коллекции из числа Concurrent?
AsParallel() и Concurrent вообще — это, если я правильно понимаю, подход к обезапашиванию коллекций от конкуренции при чтении из разных потоков. То есть, условно, гарантировать, что из [1, 2, 3, 4, 5] не будет прочитано [1, 2, 2, 3, 4, 5, 5]. При этом, если читателей 2, то один получит [1, 2, 4], а другой — [3, 5]. Передо мною же, в данном случае стоит задачка другого рода: дать возможность большому числу читателей КАЖДОМУ получить по своей порции [1, 2, 3, 4, 5].
Значит я не правильно понял задачу, извиняюсь.
Возможно это как раз тот самый случай, когда был смысл использовать вместо Enumerable, Observable + RX, как вы думаете?
О, с такой стороны я на задачу не смотрел. То есть вы предлагаете, чтобы каждый объект при «рождении» говорил: «чуваки, я пришёл, посчитайте меня»? Звучит вкусно.
Именно, правда тогда пропала бы фича(баг?) синхронности чтения данных, но думаю и эта проблема решаема.
UFO landed and left these words here
Возможно, я некорректно выразился про «Concurrent вообще». Речь о том, что задач у меня много. Я могу упаковать их пучком и при появлении объекта дёргать на выполнение, а могу задачу семантически выделить как метод Foo(IEnumerable<TSomeType> param). Это не то, чтобы лучше или хуже. Но так я, например, имею возможность ипользовать конструкции типа param.Where().OfType<TA>(). Это по-другому, чем при использовании Task<TSomeType>. Если я упустил возможность сделать так с использованием PLinq, то было бы здорово, если бы Вы подсказали, как именно.
Если вам важно сохранить ту же последовательность, то для этого добавлятся еще один параметр .AsOrdered()
// Source is ordered; let's preserve it.
var parallelQuery = from num in source.AsParallel().AsOrdered()
where num % 3 == 0
select num;
Писал про TPL Dataflow небольшой пост Использование TPL Dataflow для многопоточной компрессии файлов.
Для клонирования есть BroadcastBlock<T>.
Выглядеть это будет как-то так:
Читаем -> BufferBlock<TInput> -> BroadcastBlock<TInput> -> TransformBlock<TInput,TOutput> -> ActionBlock<TOutput>.
Блоков вида -> TransformBlock<TInput,TOutput> -> ActionBlock<TOutput> у Вас будет 100+.
Последний ActionBlock<TOutput> нужен для сохранения результатов (не понял куда Вы их пишете).
Если я правильно понял идею, Вы предлагаете использовать ETL-функциональность (что-то похожее я когда-то делал с помощью Rhino.ETL). Идея правильная, согласен. Только она всё равно позволяет мне работать с конкретным очередным экземпляром MyType из коллекции во всех срезах одновременно, а не с представлением IEnumerable<MyType>.
Можно клонировать IEnumerable<MyType>, если Вам нужно именно это. Как при этом память себя чувствует?
Библиотека позволяет убрать все эти синхронизации, которые ужасают своим видом не подготовленного читателя =)
Если я сделаю Broadcast для IEnumerable, мне кажется, склонируется только ссылка на объект и, фактически, читать все будут из одной кормушки. А это как раз недопустимо.
BroadcastBlock<TInput> в конструктор принимает клонирующую функцию Func<T, T>.
Можно впихнуть всё, что Вам будет угодно.
Есть такая очень хорошая вещь — Reactive Extensions.
Там смотрите по ключевым словам «shared observable» (это чтобы создать поток элементов для нескольких подписчиков): вызов Publish/Connect, в т.ч. тут можно почитать http://leecampbell.blogspot.ru/2010/08/rx-part-7-hot-and-cold-observables.html.
После создания расшареного потока просто навешиваете на него нужное число подписчиков/обработчиков.
Касательно параллелизации посмотрите SubscribeOn/ObserveOn, например на http://leecampbell.blogspot.ru/2010/06/rx-part-6-scheduling-and-threading.html.

Собственно дальше изучайте остальные возможности и делайте еще более клевые штуки в пару call'ов.
Но так я, например, имею возможность ипользовать конструкции типа param.Where().OfType<TA>().

По поводу этого: перед созданием каждого подписчика (вызов Subscribe) можете навешивать индивидуально разной лапши, те же Where, подписчику элемент попадает уже в виде TSomeType, но там его тоже можно обернуть в Observable и продолжить заниматься извращениями.
Я понимаю, что я не в кассу
Нужно было сделать так, чтобы на каждом enumerable.Next() потокам раздавался на чтение единственный экземпляр объекта. Такое решение имеет особенность (это скорее минус, но не страшный): все потоки будут ждать самого медленного брата, то есть чтение будет синхронным. Если честно, я очень надеялся, что кто-то уже за меня это написал.

Вы не пытались попробовать BlockingCollection с его методами TryAddAny() / TryTakeAny(). Накидали бы себе сколько угодно коллекций для исходных данных, «размазывая» по ним прочитанные данные, и сколько угодно потоков-обработчиков. Хоть 1000, которые могут запускаться вообще на других физических компьютерах (а данные хоть через WCF получать или Remoting). Я конечно не знаю всех особенностей задачи, но сложность кода ядра вашей задачи сократилась бы сильно ИМХО.
Это если вы не рассматривали такой вариант, если рассматривали, то почему отказались?
Точнее AddToAny() / TryTakeFromAny(). Во-первых запамятовал правильные названия методов, во-вторых пропусков при добавлении у вас не должно оказаться, без Try.
Спасибо за интересную идею, попробую на досуге придумать, как из неё сделать то же, что у меня получилось в итоге. Почти уверен, что выйдет короче.
TryAddAny/TryTakeAny не подойдет, поскольку будет пытаться читать и писать в одну из коллекций, но идея правильная. Все что надо сделать — это создать по одной BlockingCollection на срез, используя ConcurrentQueue с ограниченным количеством элементов как внутреннее хранилище, и при чтении исходного IEnumerable просто в цикле добавлять новый кортеж в каждую BlockingCollection. В итоге получим: все читатели будут работать со своей очередью независимо друг от друга, тем самым сглаживая флуктуации по времени обработки в срезах. Поставщик данных будет задерживаться на самом медленном читателе только если у того переполнится очередь.
Неверно. Будет пытаться писать в Any из любых незаблоченных чтением коллекций, и наоборот. Проверено. ConcurrentQueue работает уже на уровне ниже, где не сумма коллекций работает, и это уже велосипед из запроса.
Что именно неверно? TryAddToAny добавляет элемент в одну из коллекций, и возвращает индекс коллекции, в которую был добавлен элемент. TryTakeFromAny извлекает элемент из одной из коллекций, и так же возвращает индекс коллеции.

Таким образом элемент будет присутствовать только в одной из коллекций, только один из читателей сможет его прочитать.

Насчет ConcurrentQueue — так и есть, она работает уровнем ниже (BlockingCollection использует ConcurrentQueue или другую IProducerConsumerCollection<T> коллекцию как внутреннее хранилище элементов). Я имел в виду что для ограничения количества элементов, которые находятся одновременно в обработке у всех читателей можно установить лимит на количество элементов в очереди с помощью конструктора BlockingCollection<T>(int boundedCapacity).
Sign up to leave a comment.

Articles