Асинхронные Stream в C# 8

Автор оригинала: Bassam Alugili
  • Перевод
  • Tutorial

Функционал Async/Await появился в C# 5, чтобы улучшить скорость отклика пользовательского интерфейса и веб-доступ к ресурсам. Другими словами, асинхронные методы помогают разработчикам выполнять асинхронные операции, которые не блокируют потоки и возвращают один скалярный результат. После многочисленных попыток Microsoft упростить асинхронные операции, шаблон async/await завоевал хорошую репутацию среди разработчиков благодаря простому подходу.


Существующие асинхронные методы значительно ограничены тем, что должны возвращать только одно значение. Давайте рассмотрим некий обычный для такого синтаксиса метод async Task<int> DoAnythingAsync(). Результатом его работы является некоторое одно значение. Из-за такого ограничения нельзя использовать эту функцию с ключевым словом yield и асинхронным интерфейсом IEnumerable<int> (чтобы вернуть результат асинхронного перечисления).



Если объединить функцию async/await и оператор yield, тогда можно было бы использовать мощную модель программирования, известную как asynchronous data pull, или перечисление на основе pull based enumeration или асинхронную последовательность async sequence, как она называется в F#.


Новая возможность использования асинхронных потоков в C# 8 снимает ограничение, связанное с возвратом единственного результата, и позволяет асинхронному методу возвращать несколько значений. Эти изменения придадут асинхронному шаблону больше гибкости, а пользователь сможет извлекать данные откуда-либо (например из БД) с помощью отложенных асинхронных последовательностей или получать данные из асинхронных последовательностей частями по мере доступности.


Пример:


foreach await (var streamChunck in asyncStreams)
{
  Console.WriteLine($“Received data count = {streamChunck.Count}”);
}

Ещё один подход для решения проблем, связанных с асинхронным программированием, заключается в использовании реактивных расширений (Rx). Rx приобретает всё большее значение среди разработчиков и этот метод используется во многих языках программирования, например Java (RxJava) и JavaScript (RxJS).


В основе Rx лежит модель на базе проталкивания данных (push-коллекции) (принцип Tell Don’t Ask), также известная, как реактивное программирование. Т.е. в отличии от IEnumerable, когда потребитель запрашивает следующий элемент, в модели Rx поставщик данных сигнализирует потребителю о появлении в последовательности нового элемента. Данные проталкиваются в очередь в асинхронном режиме и потребитель использует их в момент поступления.


В этой статье я сравню модель на основе проталкивания данных (такой, как Rx) с моделью на основе вытягивания данных (как IEnumerable), а также покажу, для каких сценариев лучше подходит та или иная модель. Вся концепция и преимущества рассматриваются с помощью множества примеров и демонстрационного кода. В конце я покажу применение и продемонстрирую его на примере кода.


Сравнение модели на основе проталкивания данных с моделью на основе вытягивания данных (pull-)



Рис. -1- Сравнение модели на основе вытягивания данных с моделью на основе проталкивания данных


Эти примеры основаны на взаимоотношениях поставщика и потребителя данных, как показано на рис. -1-. Модель на основе вытягивания (pull-) данных проста для понимания. В ней потребитель запрашивает и получает данные от поставщика. Альтернативным подходом является модель на основе проталкивания данных (push-). Здесь, поставщик публикует данные в очереди и потребитель должен подписаться на неё, чтобы получить их.


Модель на основе вытягивания данных подходит для тех случаев, где поставщик генерирует данные быстрее, чем потребитель использует их. Таким образом, потребитель получает только необходимые данные, что позволяет избежать проблем с переполнением. Если потребитель использует данные быстрее, чем поставщик их производит, подойдёт модель на основе проталкивания данных. В этом случае поставщик может отправить потребителю больше данных, чтобы не возникло ненужных задержек.


Rx и Akka Streams (модель программирования на основе потоков) используют метод обратного давления для управления потоком. Чтобы решить проблемы поставщика и получателя, описанные выше, метод применяет как проталкивание, так и вытягивание данных.


В примере ниже медленный потребитель вытягивает данные со стороны более быстрого поставщика. После того как потребитель обработает текущий элемент, он запросит у поставщика следующий и так до конца последовательности.


Мотивация для использования и основная информация


Чтобы понять всю необходимость асинхронных потоков, рассмотрим следующий код.


// Запускаем цикл и суммируем предложенный аргумент (count)
static int SumFromOneToCount(int count)
{
  ConsoleExt.WriteLine("SumFromOneToCount called!");

  var sum = 0;
  for (var i = 0; i <= count; i++)
  {
    sum = sum + i;
  }
  return sum;
}

// Вызов метода:

const int count = 5;
ConsoleExt.WriteLine($"Starting the application with count: {count}!");
ConsoleExt.WriteLine("Classic sum starting.");
ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}");
ConsoleExt.WriteLine("Classic sum completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

Вывод:


Мы можем сделать метод отложенным, используя оператор yield, как показано ниже.


static IEnumerable<int> SumFromOneToCountYield(int count)
{
  ConsoleExt.WriteLine("SumFromOneToCountYield called!");

  var sum = 0;
  for (var i = 0; i <= count; i++)
  {
    sum = sum + i;

    yield return sum;
  }
}

Вызов метода


const int count = 5;
ConsoleExt.WriteLine("Sum with yield starting.");
foreach (var i in SumFromOneToCountYield(count))
{
  ConsoleExt.WriteLine($"Yield sum: {i}");
}
ConsoleExt.WriteLine("Sum with yield completed.");

ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

Вывод:


Как показано выше в окне вывода, результат возвращается частями, а не одним значением. Показанные выше суммарные результаты известны как отложенное перечисление. Однако, проблема по-прежнему не решена: методы суммирования блокируют код. Если посмотреть на потоки, можно увидеть, что всё запущено в основном потоке.


Давайте применим волшебное слово async к первому методу SumFromOneToCount (без yield).


static async Task<int> SumFromOneToCountAsync(int count)
{
  ConsoleExt.WriteLine("SumFromOneToCountAsync called!");

  var result = await Task.Run(() =>
  {
    var sum = 0;

    for (var i = 0; i <= count; i++)
    {
      sum = sum + i;
    }
    return sum;
  });

  return result;
}

Вызов метода


const int count = 5;
ConsoleExt.WriteLine("async example starting.");
// Операция суммирования запущена в асинхронном режиме. Но, этого недостаточно. Нужно, чтобы суммирование шло в асинхронном режиме с задержкой.
var result = await SumFromOneToCountAsync(count);
ConsoleExt.WriteLine("async Result: " + result);
ConsoleExt.WriteLine("async completed.");

ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

Вывод:


Отлично. Теперь вычисления выполняются в другом потоке, однако проблема с результатом всё ещё существует. Система так и возвращает результат единым значением.
Представьте, что мы можем совместить отложенные перечисления (оператор yield) и асинхронные методы в императивном стиле программирования. Комбинация называется асинхронные потоки и это новая функция в C# 8. Она великолепно подходит для решения проблем, связанных с моделью программирования на основе вытягивания данных, например скачивания данных с сайта или чтения записей в файле или базе данных современными способами.


Давайте попробуем сделать это в текущей версии C#. Я добавлю ключевое слово async к методу SumFromOneToCountYield следующим образом:



Рис. -2- Ошибка при одновременном использовании yield и ключевого слова async.


Когда мы пытаемся добавить async к SumFromOneToCountYield, возникает ошибка, как показано выше.
Давайте попробуем по-другому. Мы можем убрать ключевое слово yield и применить IEnumerable в задаче, как показано ниже:


static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count)
{
  ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!");
  var collection = new Collection<int>();

  var result = await Task.Run(() =>
  {
    var sum = 0;

    for (var i = 0; i <= count; i++)
    {
      sum = sum + i;
      collection.Add(sum);
    }
    return collection;
  });

  return result;
}

Вызов метода


const int count = 5;
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!");
var scs = await SumFromOneToCountTaskIEnumerable(count);
ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!");

foreach (var sc in scs)
{
  // Это не то, что нужно. Мы получили результат единым блоком.
  ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}");
}

ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);

Вывод:


Как видно из примера, всё вычисляется в асинхронном режиме, но проблема по-прежнему осталась. Результаты (все результаты собраны в коллекцию) возвращаются в виде одного блока. А это не то, что нам нужно. Если помните, нашей целью было совместить асинхронный режим вычисления с возможностью задержки.


Для этого нужно использовать внешнюю библиотеку, например Ix (часть Rx), или асинхронные потоки, представленные в C#.


Давайте вернёмся к нашему коду. Чтобы продемонстрировать асинхронное поведение, я использовал внешнюю библиотеку.


static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence)
{
  ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called");

  await sequence.ForEachAsync(value =>
  {
    ConsoleExt.WriteLineAsync($"Consuming the value: {value}");

    // моделируем некоторую задержку
    Task.Delay(TimeSpan.FromSeconds(1)).Wait();
  });
}

static IEnumerable<int> ProduceAsyncSumSeqeunc(int count)
{
  ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called");
  var sum = 0;

  for (var i = 0; i <= count; i++)
  {
    sum = sum + i;

    // моделируем некоторую задержку
    Task.Delay(TimeSpan.FromSeconds(0,5)).Wait();

    yield return sum;
  }
}

Вызов метода


const int count = 5;
ConsoleExt.WriteLine("Starting Async Streams Demo!");

// Запускаем новую задачу. Она используется для создания асинхронной последовательности данных.
IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable();

ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#");

// Запускаем ещё одну задачу; она потребляет данные из асинхронной последовательности.
var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence));

// Просто для демонстрации. Подождите, пока задача не завершится.
consumingTask.Wait();
ConsoleExt.WriteLineAsync("Async Streams Demo Done!");

Вывод:


Наконец, мы видим нужное поведение. Можно выполнить цикл перечисления в асинхронном режиме.
Исходный код см. здесь.


Вытягивание данных в асинхронном режиме на примере клиент-серверной архитектуры


Давайте рассмотрим эту концепцию на более реалистичном примере. Все преимущества этой функции лучше всего видны в контексте клиент-серверной архитектуры.


Синхронный вызов в случае клиент-серверной архитектуры


Отправляя запрос серверу, клиент вынужден ждать (т. е. он заблокирован), пока придёт ответ, как показано на рис. -3-.



Рис. -3- Синхронное вытягивание данных, во время которого клиент ожидает, пока не закончится обработка запроса


Асинхронное вытягивание данных


В этом случае клиент запрашивает данные и переходит к другим задачам. Как только данные получены, клиент продолжит выполнять работу.



Рис. -4- Асинхронное вытягивание данных, во время которого клиент может выполнять другие задачи, пока данные запрашиваются


Вытягивание данных в виде асинхронной последовательности


В этом случае клиент запрашивает часть данных и продолжает выполнять другие задачи. Затем, получив данные, клиент обрабатывает их и запрашивает следующую часть и так до тех пор, пока не получить все данные. Именно из этого сценария появилась идея асинхронных потоков. На рис. -5- показано, как клиент может обрабатывать полученные данные или выполнять другие задачи.



Рис. -5- Вытягивание данных в виде асинхронной последовательности (асинхронные потоки). Клиент не заблокирован.


Асинхронные потоки


Подобно IEnumerable<T> и IEnumerator<T> существует два новых интерфейса IAsyncEnumerable<T> и IAsyncEnumerator<T>, которые определяются как показано ниже:


public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator();
}

public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
    Task<bool> MoveNextAsync();
    T Current { get; }
}

//асинхронные потоки также предполагают асинхронное освобождение
public interface IAsyncDisposable
{
    Task DiskposeAsync();
}

В InfoQ эту тему разобрал Джонатан Аллен. Здесь я не буду вдаваться в подробности, поэтому рекомендую прочитать его статью.


Весь фокус в возвращаемом значении Task<bool> MoveNextAsync() (изменённом с bool на Task<bool>, bool IEnumerator.MoveNext() ). Благодаря ему все вычисления, а также их итерирование, будут происходить асинхронно. Потребитель сам решает, когда получить следующее значение. Несмотря на то что это асинхронная модель, она всё ещё использует вытягивание данных. Для асинхронной очистки ресурсов можно использовать интерфейс IAsyncDisposable. Более подробную информацию об асинхронных потоках можно прочитать здесь.


Синтаксис


Окончательный вариант синтаксиса должен приблизительно выглядеть, как показано ниже:


foreach await (var dataChunk in asyncStreams)
{
  // Обработка части данных с помощью ключевого слова yield или выполнение других задач.
}

Из примера выше понятно, что вместо вычисления одного значения, мы, теоретически, можем последовательно вычислить множество значений, параллельно ожидая другие асинхронные операции.


Переработанный пример Microsoft


Я переписал демонстрационный код Microsoft. Его можно скачать целиком с моего репозитория GitHub.


В основе примера лежит идея создать большой поток в памяти (массив в 20 000 байт) и последовательно извлекать из него элементы в асинхронном режиме. Во время каждой итерации из массива вытягивается 8 Кб.




На шаге (1) создаётся большой массив данных, заполняемый фиктивными значениями. Затем, во время шага (2) определяется переменная под названием checksum. Эта переменная, содержащая контрольную сумму, предназначена для проверки корректности суммы вычислений. Массив и контрольная сумма создаются в памяти и возвращаются в виде последовательности элементов на шаге (3).


Шаг (4) связан с применением метода расширения AsEnumarble (более подходящее название AsAsyncEnumarble), который помогает моделировать асинхронный поток в 8 Кб (BufferSize = 8000 элементов (6))


Наследовать от IAsyncEnumerable обычно не нужно, но в примере, показанном выше, эта операция выполняется, чтобы упростить демонстрационный код, как показано на шаге (5).


Шаг (7) связан с использованием ключевого слова foreach, которое вытягивает порции данных по 8 Кб из асинхронного потока в памяти. Процесс вытягивания происходит последовательно: когда потребитель (часть кода, содержащая foreach) готов получить следующую часть данных, он вытягивает их у поставщика (массив, содержащийся в потоке в памяти). Наконец, когда цикл завершён, программа проверит значение 'c' на соответствие контрольной сумме и, если они совпадают, выведет сообщение "Checksums match!", согласно шагу (8).


Окно вывода из демонстрационного примера Microsoft:



Заключение


Мы рассмотрели асинхронные потоки, которые великолепно подходят для асинхронного вытягивания данных и написания кода, генерирующего несколько значений в асинхронном режиме.
Используя эту модель, можно запрашивать следующий элемент данных в последовательности и получать ответ. Она отличается от модели на основе проталкивания данных IObservable<T>, при использовании которой значения генерируются независимо от состояния потребителя. Асинхронные потоки позволяют отлично представлять асинхронные источники данных, управляемые потребителем, когда он сам определяет готовность принять следующую порцию данных. Примеры включают использование веб-приложений или чтение записей в базе данных.


Я продемонстрировал, как создать перечисление в асинхронном режиме и использовать его с помощью внешней библиотеки с асинхронной последовательностью. Также я показал, какие преимущества даёт эта функция при скачивании контента из Интернета. Наконец, мы рассмотрели новый синтаксис асинхронных потоков, а также полный пример его использования на основе Microsoft Build Demo Code (7–9 мая, 2018// Сиэтл, штат Вашингтон)


Семинары Станислава Сидристого
74,29
CLRium #6: Concurrency & Parallelism
Поддержать автора
Поделиться публикацией

Комментарии 25

    +2
    выталкивания данных
    Проталкивание лучше подходит?
      +1

      Возможно, даже push-, pull- без перевода.

      0
      По ссылке github.com/dotnet/csharplang/blob/master/proposals/async-streams.md отдает 404

      Такая большая статья, а самые интересные и полезные вещи вы оставили где то по ссылкам.
        0

        Ссылку исправил. Спасибо за наводку. А статья — перевод :) Потому и по ссылкам.

        +15
        Мне очень нравятся простые примеры какой-то новой фичи, когда наглядно показывается, как раньше было всё плохо, а с новым нововведением стало всё хорошо. Пример с суммой чисел — это не то, чтобы я хотел видеть для асинхронных потоков.
        Исправляем ситуацию… Допустим, у вас есть следующий метод
        IEnumerable<FileResult> ReadFiles(string[] fileNames)
        {
          foreach(var name in fileNames)
          {
            yield return ReadFile(name);
          }
        }
        

        Видите ключевое слово yield? В нем все дело.
        Теперь вы узнали о чудесной конструкции async await и хотели бы применить ее к данному методу. И у вас даже есть новый метод ReadFileAsync.
        Вы хотели бы, конечно, написать что-то такое
        async IAsyncEnumerable<FileResult> ReadFilesAsync(string[] fileNames)
        {
          foreach(var name in fileNames)
          {
            yield return await ReadFileAsync(name);
          }
        }
        

        Так вот! Раньше так нельзя было написать. А в C# 8 можно.
          –5
          И в JavaScript тоже можно, кстати. Даже синтаксис такой же. Тот редкий случай, когда JS ненадолго обогнал C#.
            –1
            Ну как обогнал… В js async/await пришел из ts, ts синтаксис разрабатывает одна мелкомягкая компания, которая и реализовала этот паттерн изначально в c#.
              0
              В js async/await пришел из ts

              image
              0
              А причём тут вообще внезапно javascript? Тем более async/await в js и C# это несколько разные вещи.
                0

                А что там принципиально разного?

                  0
                  В C# таски могут выполняться в другой нити, а в js все асинхронные задачи выполняются в одной единственной нити.
                  Например поэтому в C# есть такой твик как ConfigureAwait()
                    0

                    Ну так это особенности рантайма вообще, а не особенности async/await.


                    И да, про задачу лучше не говорить что она "может выполняться в другой нити" — создаётся впечатление, что задача всегда выполняется в какой-то нити, а это не так.

                      0
                      Язык разработан с оглядкой на типичный рантайм (например в C# есть такие вещи как volatile и lock, имеющие смысл только в многопоточном рантайме). Так и же и async/await разработан с учётом того что таски могут выполняться в разных нитях: ConfigureAwait(). От этого может меняться тактика работы с async/await, например в C# async await может использоваться для распараллеливания cpu intensive задач, а в js в этом смысла нет.

                      Ещё один отличием является то обстоятельство что Promise`ы и Task`и не эквиваленты по фичам. Task`и подразумевают отмену операции, соответственно в месте await`а можно получить OperationCanceledException, в js ничего похоже в стандартизированного нет.
                        0

                        Вот только соответствующая фича Task'а в рамках async/await целиком сводится к всего лишь ещё одному исключению...

            0

            Асинхронные Stream в C# 8 как я понимаю не поддерживают обратного давления и для более или менее житейский ситуаций нужно использовать Rx?

              0
              Почему не поддерживают? Обратное давление как раз получится, если читать медленно (например, если внутри foreach await что-то медленное делается). Подтормаживаться будут точки yield-ов. И в этом преимущество по сравнению с Rx-подходом, где обратного давления нет совсем.
                –1
                Обратное давление как раз получится, если читать медленно

                Нет это методу который осуществляет вызов, нужно быть осторожнее и следовательно следить за реактивностью вызванного, следующего по цепочке, метода. Это увы, ручное управление давлением.

                  –1

                  Э-э-э, в каком смысле "следить"? Где в приведенном чуть выше коде вы видите ручное управление давлением?


                    foreach(var name in fileNames)
                    {
                      yield return await ReadFileAsync(name);
                    }
              0
              Нисколько не приуменьшаю этот новый функционал в C#. Для полноты картины от нужен 100%.

              Но практика для десктопных приложений показывает, что отгрузка в параллельный поток и чтение обычным синхронным способом работает в разы быстрее. Особенно на слабом железе типо IoT или телефон. Плюс в параллельном потоке можно еще что-то поделать, посчитать там что-то полезное, данные преобразовать, не нагружая при этом основной поток.

              А основной поток засирается диспетчингом всех этих асинков очень неслабо.
                0

                Один Task.Run в нужном месте (или ConfigureAwait(false)) — и вот уже основной поток не нагружается.

                  –2
                  Можно и Task.Run, а можно и new Thread(). Вопрос, что дороже. Месить ThreadPool кусками кода, или в одном параллельном потоке сделать все, что можно и вернуть результат работы.

                  ConfigureAwait(false) отрывает Task от изначального синхронизационного контекста, бросая продолжения на ThreadPool, я так понимаю. Это значит, аллокировать делегат продолжения, зафиксировать все используемые локальные переменные из окружения, найти в ThreadPool свободный поток (а это thread safe, значит лочить надо), заинициализировать все статические переменные потока, чтобы Task-и работали нормально. Плюс там все через StateMachine сделано, т.е. каждый await это switch по int.

                  Надо замерять производительность. Меня сильно беспокоит то, что в UWP они начинают все API асинхронным делать, без альтернативных синхронных методов. Добавить сюда еще постоянный COM marshalling в сишное ядно, так недолго добиться JavaScript производительности. :)
                    –2

                    Нет, ConfigureAwait(false) исполняет продолжение в том же самом потоке, в котором завершилась задача, если только задача не была создана с флагом RunContinuationsAsync

                      0
                      Разве?.. По логике вещей идет обычное планирование продолжения в ThreadPool и сначала будут сделаны запланированные ранее задачи, чем дело дойдет до этого продолжения. Иначе не будет разницы между обычными потоками без await. Нет никакой гарантии выполнения в том же потоке при ConfigureAwait(false), да и ConfigureAwait(true) тоже, если нет контекста, который жестко прибьет гвоздями продолжение к какому-то потоку.

                      Может тяготеть исполнять в том же потоке — да, но это не гарантируется.
                        +1

                        Я ничего не говорил про тот же поток, в котором был сделан вызов. Я говорил про тот же поток, в котором завершилась задача, это могут быть разные потоки.

                          +1
                          При truе зависит от реализации SynchronizationContext.

                Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                Самое читаемое