Параллельная обработка IEnumerable в .NET

    В предложенной статье рассматривается решение задачи параллельной синхронной обработки IEnumerable, а также откуда такая задача вообще взялась.

    Как и во многих других случаях, представленное решение началось со вполне конкретных потребностей.

    Преамбула


    В одном из внутренних проектов есть необходимость построения развесистого отчёта на 100+ срезов по массиву данных, чтение которого занимает более 12 часов. Сами объёмы данных тоже немаленькие. Совокупность продолжительного чтения и огромных объёмов данных (~1.5M кортежей, каждый из которых «весит» до 50 МБ) вносит в код два ограничения: крайняя нежелательность многократного чтения (50 дней на еженедельный отчёт никто тратить, увы, не готов) и техническая невозможность помещения всей выборки в оперативную память. Предшествовавший мне процесс разработки шёл, очевидно, итеративно: длина метода, агрегирующего данные для срезов составляла около 4000 строк. Передо мной была поставлена задача сделать так, чтобы данный код превратился в поддерживаемый.

    Предпосылки


    Нитью Ариадны в сложившейся ситуации для меня стало осознание, что «облизывать» одну и ту же сущность по очереди (равно как и чупа-чупс) как минимум некрасиво: даже если мне удастся отрефакторить код до состояния «срез-метод», то я получу обвешанную условиями простыню из сотни с лишним последовательных вызовов. От этого тоже хотелось уйти, так как текущая реализация не только упиралась в медленное чтение, но и добавляла сверху пробуксовку на обработку данных в одном потоке. Сервер, на котором выполняется процесс, обладает 8-ю ядрами и способен распараллелить обработку данных до такого состояния, что она будет занимать меньше времени, чем чтение очередного кортежа из базы.

    Идея


    Итак, взвесив сложившуюся ситуацию, я сформулировал незамысловатую идею: обрабатывать «поток объектов» в параллельных потоках (threads), где каждому срезу соответствует свой аккуратный метод (или даже лямбда), выполняющийся в отдельном потоке. Поскольку изначально количество объектов нам неизвестно, то для представления «потока объектов» как нельзя лучше подошёл интерфейс IEnumerable<T>. Остаётся лишь сделать так, чтобы каждому потоку достался свой IEnumerable<T>, и чтобы это был один и тот же IEnumerable<T> (прямо как в задачах про монеты и стаканы). Очевидно, что к таким объектам тут же прилипла кличка «клоны».

    Нужно было сделать так, чтобы на каждом enumerable.Next() потокам раздавался на чтение единственный экземпляр объекта. Такое решение имеет особенность (это скорее минус, но не страшный): все потоки будут ждать самого медленного брата, то есть чтение будет синхронным. Если честно, я очень надеялся, что кто-то уже за меня это написал. Например, Microsoft сделал что-нибудь подобное в своём Parallel.Linq. Однако, на тот момент ничего подобного нагуглить не удалось — всё, что касалось параллельной обработки коллекций, относилось в параллельной обработке частей одно коллекции (например, Parallel.For()), но никак — к решаемой задаче. Ну что ж, велосипеды я обожаю, напильник в руки!

    Ядро решения


    В моей голове всегда жило понимание, что за любым интерфейсом скрывается объект. Поэтому для клонирования коллекции, вероятно, нужно или найти, или придумать объект с искомым интерфейсом. А вот и не нужно. Язык C# давно и успешно дарит своим адептам операторы yield return и yield break. Их применение и легло в основу решения.

    Думаю, нет смысла объяснять, как была рождена каждая строчка кода. Объясню идею. Для того, чтобы ваших данных стало много, создаётся объект-фабрика с методом GetClone(), который возвращает вызов метода, использующего операторы yield. Чтобы чтение было синхронизировано, и никто ничего не потерял, фабрика запоминает своих читателей и делает так, чтобы чтение очередного объекта из исходного IEnumerable осуществлялось только после того, как все читатели получат свою ссылку на экземпляр предыдущего объекта. Достигается это тем, что каждому читателю присваивается два WaitHandle — «я готов читать» и «я прочитал»:

    private IEnumerable<T> _input;
    private IEnumerator<T> _inputEnumerator;
    private Dictionary<string, AutoResetEvent> _imReadyToRead;
    private Dictionary<string, AutoResetEvent> _iVeReadMyValue;
    private WaitHandle[] _ivAlreadyReadArray;
    private T _nextObject;
    private bool _hasCurrent;
    private bool _hasNext;
    ...
    private void GetNext()
    {
        if (!_hasCurrent) return;
        foreach (var ready in _imReadyToRead) ready.Value.Set();
        do
        {
            WaitHandle.WaitAll(_ivAlreadyReadArray);  // ждём всех, пока начитаются
            _hasNext = _inputEnumerator.MoveNext();  // и только потом берём следующее
            _nextObject = _inputEnumerator.Current;
            lock (_imReadyToRead)
            {
                if (!_hasNext) _hasCurrent = false;
                foreach (var ready in _imReadyToRead) ready.Value.Set();   // и снова взводим наши курки. Будем ждать читателей
            }
        } while (_hasNext);
    }
    


    Само чтение осуществляется из IEnumerable, который(ое, ая?) возвращается методом GetCloneInstance.

    private T GetCurrent(string subscriber)
    {
        T toReturn;
        _imReadyToRead[subscriber].WaitOne(); // готов читать - читай!
        toReturn = _nextObject;
        _iVeReadMyValue[subscriber].Set();
        return toReturn;
    }
    
    private IEnumerable<T> GetCloneInstance(string key)
    {
        T res;
        do
        {
           res = GetCurrent(key);
           yield return res;
        } while (true);
    }
    


    Так, казалось был, проблема распараллеливания решена. Но при предполётной подготовке выяснилась одна особенность метода WaitAll(): он поддерживает работу не более чем 64 экземплярами за раз. Но мне нужно 100+ читателей! Как быть? Да, в общем-то, просто. И я стал клонировать клонов. Из каждых 64 «честных» клона, я выбираю жертву, которую буду клонировать в дальнейшем. Для большого числа читателей у меня могу появляться клоны во 2, 3, 4 и т.д. поколении! Как показало покрытие тестами — вполне жизнеспособные твари. Выглядит это примерно так:

    private int _maxCloneOfOne = 64; // не более чем
    private IEnumerable<T> _input;   // прародитель всех клонов
    private Stack<ICloner<T>> cloners;  // объекты-клонеры, способные родить до 64 копий
    private Dictionary<Guid, IEnumerable<T>> clones;  // клоны
    private Stack<IEnumerable<T>> missMe; // клоны, которых нельзя отдавать читателям - те, которые тоже расклонированы
    				
    public IEnumerable<T> GetClone() 
    {
    	if (cloners.Count == 0)
    		cloners.Push(new Cloner<T>(_input)); // создаём первого клонера
    	
    	var isLast = 
    		clones.Count > 0 && 
    		clones.Count % (_maxCloneOfOne - 1) == 0; // если ожидаемый клон - последний возможный в потомстве клонера
    
    		ICloner<T> cloner;
    		var g = Guid.NewGuid();
    		IEnumerable<T> result;
    
    		if (!isLast)
    		{
    			cloner = cloners.Peek(); // если клон не последний - возвратим его
    		}
    		else
    		{
    			// вот если последний - будем его клонировать
    			var lastCloneForCloner = cloners.Peek().GetClone(); 
    			missMe.Push(lastCloneForCloner);
    			cloners.Push(cloner = new Cloner<T>(lastCloneForCloner));
    			g = Guid.NewGuid();
    		}
    		result = cloner.GetClone();
    		clones.Add(g, result);
    		return result;
    }
    


    Вот теперь точно всё. Добавил инициализирующий код, проверки на всякие краевые ситуации, обернул старательно в thy-catch. Работает.


    Если кому бы то ни было моё решение покажется нужным или хотя бы интересным, добро пожаловать на страничку на google.code. Желающие поучаствовать в улучшении и привинчивании новых фич получат ключи от всех дверей.
    Share post

    Similar posts

    AdBlock has stolen the banner, but banners are not teeth — they will be back

    More
    Ads

    Comments 29

      +7
      Вы рассматривали msdn.microsoft.com/en-us/devlabs/gg585582.aspx (TPL Dataflow)?
      И чем стандартные варианты типа Parallel.For с собственным партицированием и енумератором не подошли?
        0
        TPL не рассматривал — побежал читать по этого зверя.
        Parallel.For, насколько я себе его понимаю, брёт коллекцию, нарезает и выполняет одно и то же одно действие (метод) для всех элементов, просто в нескольких потоках. И ускорение он даёт именно за счёт того, что данные уже есть и можно их резать. В моём же случае данные капают из тоненького крана и нужны не одному методу, а сотне. Собственно, решение относится не только и не столько к вопросам ускорения — допускаю, что использование нативной реализации могло дать ещё больший выигрыш в скорости — мне было интересно сделать удобное с точки зрения архитектуры решение по параллельной синхронной обработке данных.
          +5
          Я думаю, это может быть интересно (лично я прочитал прям на одном дыхании):
          http://www.microsoft.com/en-us/download/details.aspx?id=19222
            +1
            Спасибо, уже в читалке
              0
              Эх, только хотел предложить почитать =)
              –2
              Насколько я знаю, Parallel.For не требуется знать общее количество элементов. Он выделяет по Task'у на каждый элемент. Когда кончаются свободные таски, он ждет завершения начатых, а когда элементы — то возвращает результат. Уверены, что к вашей задаче это не подходит?
                0
                Изначально задача решалась вообще в цикле. Если постараться, её можно запихнуть практически в любую параллельную конструкцию. Однако в таком случае для меня Task-ом станет добавление информации для 100+ срезов. То есть этот Task будет выглядеть как бегемот. А разделение коллекции на клонов позволяет мне работать с клоном как с оригинальной коллекцией. Получилось просто красивее и удобнее при поддержке кода.
            +2
            Хотя в статье упоминается PLinq, я всеже все равно нt совсем поняk, чем не подошел для данной задачи например .AsParallel() или какие-нибудь коллекции из числа Concurrent?
              0
              AsParallel() и Concurrent вообще — это, если я правильно понимаю, подход к обезапашиванию коллекций от конкуренции при чтении из разных потоков. То есть, условно, гарантировать, что из [1, 2, 3, 4, 5] не будет прочитано [1, 2, 2, 3, 4, 5, 5]. При этом, если читателей 2, то один получит [1, 2, 4], а другой — [3, 5]. Передо мною же, в данном случае стоит задачка другого рода: дать возможность большому числу читателей КАЖДОМУ получить по своей порции [1, 2, 3, 4, 5].
                0
                Значит я не правильно понял задачу, извиняюсь.
                Возможно это как раз тот самый случай, когда был смысл использовать вместо Enumerable, Observable + RX, как вы думаете?
                  0
                  О, с такой стороны я на задачу не смотрел. То есть вы предлагаете, чтобы каждый объект при «рождении» говорил: «чуваки, я пришёл, посчитайте меня»? Звучит вкусно.
                    0
                    Именно, правда тогда пропала бы фича(баг?) синхронности чтения данных, но думаю и эта проблема решаема.
                  +1
                  Нет, вы ошибаетесь (наверное, спутали с Synchronized). Это как раз-таки затем, чтобы выполнить задачу в несколько потоков (по умолчанию, будет = количеству ядер в системе). Почитайте msdn, там есть примеры.
                    0
                    Возможно, я некорректно выразился про «Concurrent вообще». Речь о том, что задач у меня много. Я могу упаковать их пучком и при появлении объекта дёргать на выполнение, а могу задачу семантически выделить как метод Foo(IEnumerable<TSomeType> param). Это не то, чтобы лучше или хуже. Но так я, например, имею возможность ипользовать конструкции типа param.Where().OfType<TA>(). Это по-другому, чем при использовании Task<TSomeType>. Если я упустил возможность сделать так с использованием PLinq, то было бы здорово, если бы Вы подсказали, как именно.
                    0
                    Если вам важно сохранить ту же последовательность, то для этого добавлятся еще один параметр .AsOrdered()
                      0
                      // Source is ordered; let's preserve it.
                      var parallelQuery = from num in source.AsParallel().AsOrdered()
                      where num % 3 == 0
                      select num;
                  0
                  Писал про TPL Dataflow небольшой пост Использование TPL Dataflow для многопоточной компрессии файлов.
                  Для клонирования есть BroadcastBlock<T>.
                  Выглядеть это будет как-то так:
                  Читаем -> BufferBlock<TInput> -> BroadcastBlock<TInput> -> TransformBlock<TInput,TOutput> -> ActionBlock<TOutput>.
                  Блоков вида -> TransformBlock<TInput,TOutput> -> ActionBlock<TOutput> у Вас будет 100+.
                  Последний ActionBlock<TOutput> нужен для сохранения результатов (не понял куда Вы их пишете).
                    0
                    Если я правильно понял идею, Вы предлагаете использовать ETL-функциональность (что-то похожее я когда-то делал с помощью Rhino.ETL). Идея правильная, согласен. Только она всё равно позволяет мне работать с конкретным очередным экземпляром MyType из коллекции во всех срезах одновременно, а не с представлением IEnumerable<MyType>.
                      0
                      Можно клонировать IEnumerable<MyType>, если Вам нужно именно это. Как при этом память себя чувствует?
                      Библиотека позволяет убрать все эти синхронизации, которые ужасают своим видом не подготовленного читателя =)
                        0
                        Если я сделаю Broadcast для IEnumerable, мне кажется, склонируется только ссылка на объект и, фактически, читать все будут из одной кормушки. А это как раз недопустимо.
                          0
                          BroadcastBlock<TInput> в конструктор принимает клонирующую функцию Func<T, T>.
                          Можно впихнуть всё, что Вам будет угодно.
                    0
                    Есть такая очень хорошая вещь — Reactive Extensions.
                    Там смотрите по ключевым словам «shared observable» (это чтобы создать поток элементов для нескольких подписчиков): вызов Publish/Connect, в т.ч. тут можно почитать http://leecampbell.blogspot.ru/2010/08/rx-part-7-hot-and-cold-observables.html.
                    После создания расшареного потока просто навешиваете на него нужное число подписчиков/обработчиков.
                    Касательно параллелизации посмотрите SubscribeOn/ObserveOn, например на http://leecampbell.blogspot.ru/2010/06/rx-part-6-scheduling-and-threading.html.

                    Собственно дальше изучайте остальные возможности и делайте еще более клевые штуки в пару call'ов.
                      0
                      Но так я, например, имею возможность ипользовать конструкции типа param.Where().OfType<TA>().

                      По поводу этого: перед созданием каждого подписчика (вызов Subscribe) можете навешивать индивидуально разной лапши, те же Where, подписчику элемент попадает уже в виде TSomeType, но там его тоже можно обернуть в Observable и продолжить заниматься извращениями.
                      0
                      Я понимаю, что я не в кассу
                      Нужно было сделать так, чтобы на каждом enumerable.Next() потокам раздавался на чтение единственный экземпляр объекта. Такое решение имеет особенность (это скорее минус, но не страшный): все потоки будут ждать самого медленного брата, то есть чтение будет синхронным. Если честно, я очень надеялся, что кто-то уже за меня это написал.

                      Вы не пытались попробовать BlockingCollection с его методами TryAddAny() / TryTakeAny(). Накидали бы себе сколько угодно коллекций для исходных данных, «размазывая» по ним прочитанные данные, и сколько угодно потоков-обработчиков. Хоть 1000, которые могут запускаться вообще на других физических компьютерах (а данные хоть через WCF получать или Remoting). Я конечно не знаю всех особенностей задачи, но сложность кода ядра вашей задачи сократилась бы сильно ИМХО.
                      Это если вы не рассматривали такой вариант, если рассматривали, то почему отказались?
                        0
                        Точнее AddToAny() / TryTakeFromAny(). Во-первых запамятовал правильные названия методов, во-вторых пропусков при добавлении у вас не должно оказаться, без Try.
                          0
                          Спасибо за интересную идею, попробую на досуге придумать, как из неё сделать то же, что у меня получилось в итоге. Почти уверен, что выйдет короче.
                            0
                            TryAddAny/TryTakeAny не подойдет, поскольку будет пытаться читать и писать в одну из коллекций, но идея правильная. Все что надо сделать — это создать по одной BlockingCollection на срез, используя ConcurrentQueue с ограниченным количеством элементов как внутреннее хранилище, и при чтении исходного IEnumerable просто в цикле добавлять новый кортеж в каждую BlockingCollection. В итоге получим: все читатели будут работать со своей очередью независимо друг от друга, тем самым сглаживая флуктуации по времени обработки в срезах. Поставщик данных будет задерживаться на самом медленном читателе только если у того переполнится очередь.
                              0
                              Неверно. Будет пытаться писать в Any из любых незаблоченных чтением коллекций, и наоборот. Проверено. ConcurrentQueue работает уже на уровне ниже, где не сумма коллекций работает, и это уже велосипед из запроса.
                                0
                                Что именно неверно? TryAddToAny добавляет элемент в одну из коллекций, и возвращает индекс коллекции, в которую был добавлен элемент. TryTakeFromAny извлекает элемент из одной из коллекций, и так же возвращает индекс коллеции.

                                Таким образом элемент будет присутствовать только в одной из коллекций, только один из читателей сможет его прочитать.

                                Насчет ConcurrentQueue — так и есть, она работает уровнем ниже (BlockingCollection использует ConcurrentQueue или другую IProducerConsumerCollection<T> коллекцию как внутреннее хранилище элементов). Я имел в виду что для ограничения количества элементов, которые находятся одновременно в обработке у всех читателей можно установить лимит на количество элементов в очереди с помощью конструктора BlockingCollection<T>(int boundedCapacity).

                          Only users with full accounts can post comments. Log in, please.