В рамках одного из обсуждении с чатах я предложил использовать функцию Merge для IAsyncEnumerable<T>, чтобы объединить результаты чтения однотипных данных из разных источников. Но когда попытался сделать пример оказалось что такой функции в System.Linq.Async нет. Есть аналог в Reactive Extensions, но тащить библиотеку для одного примера не захотел и решил написать сам (почти).
Простая реализация
Когда написал сигнатуру функции Copilot сам предложил подходящую реализацию. После небольшого косметического рефакторинга получилось так:
public static IAsyncEnumerable<T> MergeNaive<T>(this IEnumerable<IAsyncEnumerable<T>> sources) { ArgumentNullException.ThrowIfNull(sources); return MergeNaiveInternal(sources); } private static async IAsyncEnumerable<T> MergeNaiveInternal<T>(IEnumerable<IAsyncEnumerable<T>> sources, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) { List<IAsyncEnumerator<T>> enumerators = []; List<Task<bool>> tasks = []; try { // Start all enumerators foreach (var source in sources) { var enumerator = source.GetAsyncEnumerator(cancellationToken); enumerators.Add(enumerator); tasks.Add(enumerator.MoveNextAsync().AsTask()); } // Process results as they become available while (tasks.Count > 0) { var completedTask = await Task.WhenAny(tasks).ConfigureAwait(false); var index = tasks.IndexOf(completedTask); var enumerator = enumerators[index]; if (await completedTask) { yield return enumerator.Current; tasks[index] = enumerator.MoveNextAsync().AsTask(); } else { // This enumerator is done, remove it tasks.RemoveAt(index); enumerators.RemoveAt(index); } } } finally { // Clean up all enumerators foreach (var enumerator in enumerators) { await enumerator.DisposeAsync().ConfigureAwait(false); } } }
Этот код запускает все энумераторы и потом выдает результаты каждого из них по готовности.
Код для проверки
List<IAsyncEnumerable<int>> sources =[ GetNumbersAsync(1, 5), GetNumbersAsync(6, 5), GetNumbersAsync(11, 5) ]; await foreach (var number in sources.MergeNaive()) { Console.WriteLine(number); } static async IAsyncEnumerable<int> GetNumbersAsync(int start, int count) { for (int i = 0; i < count; i++) { yield return start + i; } }
Бенчмарк простого варианта
Чтобы понять, насколько эффективен такой код нужно сравнить его с простым последовательным обходом всех источников. Возьму BenchmarkDotNet для этой задачи.
[SimpleJob] [MemoryDiagnoser] public class Benchmarks { private IEnumerable<IAsyncEnumerable<int>> _sources = null!; private const int SourceCount = 10; private const int ItemsPerSource = 10000; [GlobalSetup] public void Setup() { _sources = Enumerable .Range(0, SourceCount) .Select(i => CreateAsyncEnumerable(i * ItemsPerSource, ItemsPerSource)); } [Benchmark(Baseline = true)] public async Task<int> NaiveSequential() { var sum = 0; foreach (var source in _sources) { await foreach (var item in source) { sum += item; } } return sum; } [Benchmark] public async Task<int> MergeNaive() { var sum = 0; await foreach (var item in _sources.MergeNaive()) { sum += item; } return sum; } private static async IAsyncEnumerable<int> CreateAsyncEnumerable(int start, int count) { for (int i = 0; i < count; i++) { yield return start + i; } } }
Результаты бенмарка удивили. Наивная реализация потребляет в 1300 раз больше памяти.
Method | Mean | Ratio | Gen0 | Allocated | Alloc Ratio |
|---|---|---|---|---|---|
NaiveSequential | 130.3 us | 1.00 | - | 1.13 KB | 1.00 |
MergeNaive | 1,120.0 us | 8.60 | 179.6875 | 1472.24 KB | 1,299.63 |
Почему ест столько памяти?
Потому что IAsyncEnumerator<T>.MoveNextAsync() возвращает ValueTask. Этот тип специально создан для случаев когда асинхронный метод выполняется синхронно. В этом случае не надо создавать Task . async\await реализует этот паттерн, а наш код нет.
Попробуем исправить
public static IAsyncEnumerable<T> MergeOptimized<T>(this IEnumerable<IAsyncEnumerable<T>> sources) { ArgumentNullException.ThrowIfNull(sources); return MergeOptimizedInternal(sources); } private static async IAsyncEnumerable<T> MergeOptimizedInternal<T>( IEnumerable<IAsyncEnumerable<T>> sources, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) { List<IAsyncEnumerator<T>>? enumerators = null; List<Task<bool>>? tasks = null; try { // Start all enumerators foreach (var source in sources) { var enumerator = source.GetAsyncEnumerator(cancellationToken); var moveNext = enumerator.MoveNextAsync(); while (moveNext.IsCompleted) { if (!moveNext.Result) break; yield return enumerator.Current; moveNext = enumerator.MoveNextAsync(); } if (!moveNext.IsCompleted) { enumerators ??= []; enumerators.Add(enumerator); tasks ??= []; tasks.Add(moveNext.AsTask()); } } // Process results as they become available while (tasks?.Count > 0) { Debug.Assert(enumerators is not null); Debug.Assert(tasks is not null); var completedTask = await Task.WhenAny(tasks).ConfigureAwait(false); var index = tasks.IndexOf(completedTask); var enumerator = enumerators[index]; if (completedTask.Result) { while (true) { yield return enumerator.Current; var moveNext = enumerator.MoveNextAsync(); if (!moveNext.IsCompleted) { // Need to wait for next element tasks[index] = moveNext.AsTask(); break; } if (!await moveNext) { // This enumerator is done, remove it tasks.RemoveAt(index); enumerators.RemoveAt(index); break; } } } else { // This enumerator is done, remove it tasks.RemoveAt(index); enumerators.RemoveAt(index); } } } finally { if (enumerators is not null) { // Clean up all enumerators foreach (var enumerator in enumerators) { await enumerator.DisposeAsync().ConfigureAwait(false); } } } }
Теперь мы не выполняем AsTask для каждой ValueTask, а только для тех, что не завершились синхронно.
Результаты бенчмарка
Method | Mean | Ratio | Gen0 | Allocated | Alloc Ratio |
|---|---|---|---|---|---|
NaiveSequential | 132.2 us | 1.00 | - | 1.13 KB | 1.00 |
MergeNaive | 1,116.8 us | 8.45 | 179.6875 | 1472.21 KB | 1,299.61 |
MergeOptimized | 293.3 us | 2.22 | - | 1.37 KB | 1.21 |
Но у нас все последовательности синхронные. А применять этот метод будем чаще всего для асинхронных. Для проверки вставлю Task.Yield в генератор в бенчмарке.
Результаты бенчмарка на асинхронных последовательностях
Method | Mean | Ratio | Gen0 | Allocated | Alloc Ratio |
|---|---|---|---|---|---|
NaiveSequential | 4.424 ms | 1.00 | - | 2.14 KB | 1.00 |
MergeNaive | 5.348 ms | 1.21 | 312.5000 | 2540.26 KB | 1,186.69 |
MergeOptimized | 5.377 ms | 1.22 | 265.6250 | 2267.24 KB | 1,059.15 |
Два мегабайта на обход 10 000 элементов последовательностей, примерно по 200 байт на каждый элемент.
We need to go deeper
В Visual Studio запускаю профайлер в режиме отслеживания аллокаций.

Результаты профайлинга

Главный виновник нашелся сразу, развернем узел, посмотрим что происходит внутри

Внутри WhenAny суммарно было аллоцировано 10 000 массивов Task, ровно по одному на каждый вызов при обходе последовательности из 10 000 элементов.
Посмотрим в исходниках почему так получается https://source.dot.net/#System.Private.CoreLib/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs,b1b17c5b0055cb0b
Интересующая нас ветка кода WhenAny
// Skip a List allocation/copy if tasks is a collection if (tasks is ICollection<TTask> tasksAsCollection) { // Take a more efficient path if tasks is actually a list or an array. Arrays are a bit less common, // since if argument was strongly-typed as an array, it would have bound to the array-based overload. if (tasks.GetType() == typeof(List<TTask>)) { return WhenAnyCore((ReadOnlySpan<TTask>)CollectionsMarshal.AsSpan(Unsafe.As<List<TTask>>(tasks))); } if (tasks is TTask[] tasksAsArray) { return WhenAnyCore((ReadOnlySpan<TTask>)tasksAsArray); } int count = tasksAsCollection.Count; if (count <= 0) { ThrowHelper.ThrowArgumentException(ExceptionResource.Task_MultiTaskContinuation_EmptyTaskList, ExceptionArgument.tasks); } var taskArray = new TTask[count]; tasksAsCollection.CopyTo(taskArray, 0); foreach (TTask task in taskArray) { if (task is null) { ThrowHelper.ThrowArgumentException(ExceptionResource.Task_MultiTaskContinuation_NullTask, ExceptionArgument.tasks); } } return TaskFactory.CommonCWAnyLogic(taskArray); }
Мы видим что функция всегда создает массив, в который копирует исходные таски. Посмотрим где еще вызывается TaskFactory.CommonCWAnyLogic.

Эта функция вызывается внутри ContinueWhenAnyImpl, которая в свою очередь вызывается в TaskFactory.ContinueWhenAny. Последняя функция подойдет для наших задач.
Проблема только в том, что нам надо выделить один раз, а количество задач, которые надо менять, уменьшается по мере окончания входных последовательностей.
Получился такой код
private static async IAsyncEnumerable<T> MergeOptimizedInternal<T>(IEnumerable<IAsyncEnumerable<T>> sources, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) { List<IAsyncEnumerator<T>>? enumerators = null; List<Task<bool>>? tasks = null; TaskCompletionSource<bool> tcs = new(); // Never completes Task<bool>[]? tasksToWait = null; try { // Start all enumerators foreach (var source in sources) { var enumerator = source.GetAsyncEnumerator(cancellationToken); var moveNext = enumerator.MoveNextAsync(); while (moveNext.IsCompleted) { if (!moveNext.Result) break; yield return enumerator.Current; moveNext = enumerator.MoveNextAsync(); } if (!moveNext.IsCompleted) { enumerators ??= []; enumerators.Add(enumerator); tasks ??= []; tasks.Add(moveNext.AsTask()); } } // Process results as they become available while (tasks?.Count > 0) { Debug.Assert(enumerators is not null); Debug.Assert(tasks is not null); // Fill task array if (tasksToWait is null) { tasksToWait = tasks.ToArray(); } else { for (int i = 0; i < tasks.Count; i++) { tasksToWait[i] = tasks[i]; } for (int i = tasks.Count; i < tasksToWait.Length; i++) { tasksToWait[i] = tcs.Task; } } // Wait for any numer of tasks to complete await Task.Factory .ContinueWhenAny(tasksToWait, t => { }, cancellationToken) .ConfigureAwait(false); // Loop all completed tasks foreach (var completedTask in tasksToWait.Where(t => t.IsCompleted)) { var index = tasks.IndexOf(completedTask); var enumerator = enumerators[index]; if (completedTask.Result) { while (true) { yield return enumerator.Current; var moveNext = enumerator.MoveNextAsync(); if (!moveNext.IsCompleted) { // Need to wait for next element tasks[index] = moveNext.AsTask(); break; } if (!moveNext.Result) { // This enumerator is done, remove it tasks.RemoveAt(index); enumerators.RemoveAt(index); break; } } } else { // This enumerator is done, remove it tasks.RemoveAt(index); enumerators.RemoveAt(index); } } } } finally { if (enumerators is not null) { // Clean up all enumerators foreach (var enumerator in enumerators) { await enumerator.DisposeAsync().ConfigureAwait(false); } } } }
Два нововведения: используем Task.Factory.ContinueWhenAny и обходим все завершенные задачи, так как внутри цикла есть yield и к следующей итерации часть задач может перейти в завершенное состояние.
По мере окончания последовательностей заполняем пустые места в массиве ожидания задачей, которая никогда не завершится.
Результаты профайлинга

Аллокация в куче ValueTaskSourceAsTask происходит внутри метода AsTask, но это только в том случае если ValueTask не содержал внутри настоящего Task. Этот ненастоящий Task появился из-за использования Task.Yeld. Когда заменил Task.Yeld на Task.Delay(0), то профиль стал такой:

Код Merge перестал делать значимое количество аллокаций.
Бенчмарки это тоже подтверждают:
Method | Mean | Ratio | Gen0 | Allocated | Alloc Ratio |
|---|---|---|---|---|---|
NaiveSequential | 141.8 us | 1.00 | - | 1.21 KB | 1.00 |
MergeNaive | 1,119.8 us | 7.90 | 179.6875 | 1472.29 KB | 1,215.83 |
MergeOptimized | 296.1 us | 2.09 | - | 1.55 KB | 1.28 |
Я же не один это сделал
Скорее всего другие тоже сталкивались с подобной проблемой. Даже внутри Microsoft, и скорее всего у них есть реализация мержа асинхронных последовательностей.
Покопавшись в source.dot.net я нашел метод MergeAsync в анализаторах. Он internal, поэтому просто скопирую к себе.
public static IAsyncEnumerable<T> MergeAsync<T>( this ImmutableArray<IAsyncEnumerable<T>> streams, CancellationToken cancellationToken) { // Code provided by Stephen Toub, but heavily modified after that. // 1024 chosen as a way to ensure we don't necessarily create a huge unbounded channel, while also making it // so that we're unlikely to throttle on any stream unless there is truly a huge amount of results in it. var channel = Channel.CreateBounded<T>(1024); var tasks = new Task[streams.Length]; for (var i = 0; i < streams.Length; i++) tasks[i] = ProcessAsync(streams[i], channel.Writer, cancellationToken); // Complete the channel writer with the result of all the tasks. If nothing failed, t.Exception will be // null and this will complete successfully. If anything failed, the exception will propagate out. // // Note: passing CancellationToken.None here is intentional/correct. We must complete all the channels to // allow reading to complete as well. Task.WhenAll(tasks).CompletesChannel(channel); return channel.Reader.ReadAllAsync(cancellationToken); static async Task ProcessAsync(IAsyncEnumerable<T> stream, ChannelWriter<T> writer, CancellationToken cancellationToken) { await foreach (var value in stream.ConfigureAwait(false)) await writer.WriteAsync(value, cancellationToken).ConfigureAwait(false); } }
Комментарий Code provided by Stephen Toub внушает доверие :) А результаты бенчмарков не очень.
Method | Mean | Ratio | Gen0 | Gen1 | Allocated | Alloc Ratio |
|---|---|---|---|---|---|---|
NaiveSequential | 144.6 us | 1.00 | 1.2207 | - | 11.05 KB | 1.00 |
MergeNaive | 4,689.5 us | 32.42 | 617.1875 | 23.4375 | 5099.48 KB | 461.30 |
MergeOptimized | 299.8 us | 2.07 | 0.9766 | - | 11.39 KB | 1.03 |
MergeMicrosoft | 1,840.4 us | 12.73 | 15.6250 | - | 148.23 KB | 13.41 |
Но мы проверяем мерж последовательностей, которые фактически отрабатывают синхронно. Надо провести забег, когда элементы появляются асинхронно и с задержкой, как будет в подавляющем большинстве практических применений.
Бенчмарк асинхронного варианта
Для этого бенчмарка я просто заменил Task.Delay(0) на Task.Delay(1) и сделал разные параметры.
Method | Count, Items | Mean | Ratio | Gen0 | Allocated | Alloc Ratio |
|---|---|---|---|---|---|---|
MergeOptimized | (10, 1000) | 150.4 ms | 0.97 | 250.0000 | 3.58 MB | 1.82 |
MergeMicrosoft | (10, 1000) | 154.7 ms | 1.00 | 250.0000 | 1.97 MB | 1.00 |
MergeOptimized | (100, 100) | 1,542.7 ms | 0.98 | - | 3.17 MB | 1.93 |
MergeMicrosoft | (100, 100) | 1,576.2 ms | 1.00 | - | 1.64 MB | 1.00 |
Внезапно (нет) вариант от Microsoft оказался в два раза менее ресурсоемким, чем оптимизированный мерж, но по времени работы он немного проигрывает.
Выясняем в профайлере почему так

Видим что наш вариант проигрывает, потому что вызывает AsTask, который создает ValueTaskSourceAsTask. ValueTaskSource появляется из-за использования async-генераторов, что в реальном коде встречается чуть реже, чем всегда. При использовании await (как в варианте Microsoft) боксинга не происходит, код написан немного по-другому.
Разница в скорости работы двух методов измеряется в единицах микросекунд в пересчете на один элемент последовательности. В реальном коде большая часть времени будет уходить на ожидания следующего элемента. Поэтому можно забить на эту разницу и остановиться на варианте Microsoft.
Можно ли сделать ожидание без async\await и аллокаций
Для одного Task или ValueTask — да, очень легко. Для нескольких — нет, такая возможность не предусмотрена, а любые велосипеды приведут к созданию объектов в куче.
Выводы
Не вызывать
Task.WhenAnyв циклеНе вызывать
ValueTask.AsTaskв циклеПодсматривать как делает Microsoft, а не изобретать свои велосипеды
Код доступен по ссылке.
