Как стать автором
Обновить

Async/await в C#: концепция, внутреннее устройство, полезные приемы

.NET *C# *Параллельное программирование *
Доброго времени суток. В этот раз поговорим на тему, в которой начинал разбираться каждый уважающий себя адепт языка C# — асинхронное программирование с использованием Task или, в простонародье, async/await. Microsoft проделали хорошую работу — ведь для того, чтобы использовать асинхронность в большинстве случаев нужно лишь знание синтаксиса и никаких других подробностей. Но если лезть вглубь, тема довольно объемная и сложная. Ее излагали многие, каждый в своем стиле. Есть очень много классных статей по этой теме, но все равно существует масса заблуждений вокруг нее. Постараемся исправить положение и разжевать материал настолько, насколько это возможно, не жертвуя ни глубиной, ни пониманием.



Рассматриваемые темы/главы:

  1. Концепция асинхронности — преимущества асинхронности и мифы о «заблокированном» потоке
  2. TAP. Синтаксис и условия компиляции — необходимые условия для написания компилирующегося метода
  3. Работа с применением TAP — механика и поведение программы в асинхронном коде (освобождения потоков, запуск задач и ожидание их завершения)
  4. За кулисами: машина состояний — обзор преобразований компилятора и сгенерированных им классов
  5. Истоки асинхронности. Устройство стандартных асинхронных методов — асинхронные методы для работы с файлами и сетью изнутри
  6. Классы и приемы при работе с TAP — полезные приемы, которые могут помочь с управлением и ускорением программы с применением TAP

Концепция асинхронности


Асинхронность сама по себе далеко не нова. Как правило, асинхронность подразумевает выполнение операции в стиле, не подразумевающем блокирование вызвавшего потока, то есть запуск операции без ожидания ее завершения. Блокирование — это не такое зло, как его описывают. Можно встретить утверждения, что заблокированные потоки зря расходуют процессорное время, работают медленнее и вызывают дождь. Последнее кажется маловероятным? На самом деле предыдущие 2 пункта такие же.

На уровне планировщика ОС, когда поток находится в состоянии «блокирован», ему не будет выделяться драгоценное процессорное время. Вызов планировщика, как правило, приходится на операции вызывающие блокировку, прерывания по таймеру и другие прерывания. То есть когда, например, контроллер диска завершит операцию чтения и инициирует соответствующее прерывание, запустится планировщик. Он будет решать, запускать поток, который был блокирован этой операцией, или какой-то другой с более высоким приоритетом.

Медленная работа кажется еще более абсурдной. Ведь по факту работа выполняется одна и та же. Только на выполнение асинхронной операции добавляются еще небольшие накладные расходы.

Вызов дождя — это вообще что-то не из этой области.

Основная проблема блокирования — неразумное потребление ресурсов компьютера. Даже если мы забудем о времени на создание потока и будем работать с пулом потоков, то на каждый заблокированный поток расходуется лишнее место. Ну и есть сценарии, где определенную работу может осуществлять только один поток (например, UI поток). Соответственно, не хотелось бы, чтобы он был занят задачей, которую может выполнить другой поток, жертвуя выполнением эксклюзивных для него операций.

Асинхронность — понятие весьма обширное и может достигаться многими путями.
В истории .NET можно выделить следующие:

  1. EAP (Event-based Asynchronous Pattern) — как следует из названия, поход основан на событиях, которые срабатывают по завершении операции и обычного метода, вызывающего эту операцию
  2. APM (Asynchronous Programming Model) — основан на 2 методах. Метод BeginSmth возвращает интерфейс IAsyncResult. Метод EndSmth принимает IAsyncResult (если к моменту вызова EndSmth операция не завершена, поток блокируется)
  3. TAP (Task-based Asynchronous Pattern) — тот самый async/await (если говорить строго, то эти слова появились уже после появления подхода и типов Task и Task<TResult>, но async/await значительно улучшил эту концепцию)

Последний подход был настолько удачен, что про предыдущие все успешно забыли. Так, речь пойдет именно про него.

Task-based asynchronous pattern. Синтаксис и условия компиляции


Стандартный асинхронный метод в стиле TAP написать очень просто.

Для этого нужно:

  1. Чтобы возвращаемое значение было Task, Task<T> или void (не рекомендуется, рассмотрено далее). В C# 7 пришли Task-like типы (рассмотрены в последней главе). В C# 8 к этому списку добавляется еще IAsyncEnumerable<T> и IAsyncEnumerator<T>
  2. Чтобы метод был помечен ключевым словом async, а внутри содержал await. Это ключевые слова идут в паре. При этом если метод содержит await, обязательно его помечать async, обратное неверно, но бессмысленно
  3. Для приличия соблюдать конвенцию о суффиксе Async. Разумеется, компилятор за ошибку считать это не будет. Если вы ну очень приличный разработчик, то можете добавлять перегрузки с CancellationToken (рассмотрен в последней главе)

Для таких методов компилятор проделывает серьезную работу. И они становятся совершенно неузнаваемы за кулисами, но об этом позже.

Было упомянуто, что метод должен содержать ключевое слово await. Оно (слово) указывает на необходимость асинхронного ожидания выполнения задачи, которую представляет тот объект задачи, к которому оно применяется.

Объект задачи, также имеет определенные условия, чтобы к нему можно было применить await:

  1. Ожидаемый тип должен иметь публичный (или internal) метод GetAwaiter(), это может быть и метод расширения. Этот метод возвращает объект ожидания
  2. Объект ожидания должен реализовать интерфейс INotifyCompletion, который обязывает реализовать метод void OnCompleted(Action continuation). Также он должен иметь экземплярные свойство bool IsCompleted, метод void GetResult(). Может быть как структурой, так и классом.

В примере ниже показано, как сделать int ожидаемым, да еще и никогда не выполняемым.

Расширение int
public class Program
{
    public static async Task Main()
    {
        await 1;
    }
}

public static class WeirdExtensions
{
    public static AnyTypeAwaiter GetAwaiter(this int number) => new AnyTypeAwaiter();

    public class AnyTypeAwaiter : INotifyCompletion
    {
        public bool IsCompleted => false;

        public void OnCompleted(Action continuation) { }

        public void GetResult() { }
    }
}



Работа с применением TAP


Сложно идти в дебри не понимая, как что-то должно работать. Рассмотрим TAP с точки зрения поведения программы.

По терминологии: рассматриваемый асинхронный метод, чей код будет рассматриваться, я буду называть асинхронный метод, а вызываемые асинхронные методы внутри него я буду называть асинхронная операция.

Возьмем наипростейший пример, в качестве асинхронной операции возьмем Task.Delay, который осуществляет задержку на указанное время, не блокируя поток.

public static async Task DelayOperationAsync() // асинхронный метод
{
    BeforeCall();
    Task task = Task.Delay(1000); //асинхронная операция
    AfterCall();
    await task;
    AfterAwait();
} 

Выполнение метода с точки зрения поведения происходит так.

  1. Выполняется весь код, предшествующий вызову асинхронной операции. В данном случае это метод BeforeCall
  2. Выполняется вызов асинхронной операции. На данном этапе поток не освобождается и не блокируется. Данная операция возвращает результат — упомянутый объект задачи (как правило Task), который сохраняется в локальную переменную
  3. Выполняется код после вызова асинхронной операции, но до ожидания (await). В примере — AfterCall
  4. Ожидание завершения на объекте задачи (который сохранили в локальную переменную) — await task.

    Если асинхронная операция к этому моменту завершена, то выполнение продолжается синхронно, в том же потоке.

    Если асинхронная операция не завершена, то сохраняется код, который надо будет вызвать по завершении асинхронной операции (т.н. продолжение), а поток возвращается в пул потоков и становится доступен для использования.
  5. Выполнение операций после ожидания — AfterAwait — выполняется или сразу же, в том же потоке, когда операция на момент ожидания была завершена, или, по завершении операции, берется новый поток, который выполнит продолжение (сохраненное на предыдущем шаге)


За кулисами. Машина состояний


На самом деле наш метод преобразовывается компилятором в метод заглушку, в которой происходит инициализация сгенерированного класса — машины состояний. Далее она (машина) запускается, а из метода возвращается объект Тask, используемый на шаге 2.

Особый интерес представляет метод MoveNext машины состояний. Данный метод выполняет то, что было до преобразования в асинхронном методе. Он разбивает код на части между каждым вызовом await. Каждая часть выполняется при определенном состоянии машины. Сам метод MoveNext присоединяется к объекту ожидания в качестве продолжения. Сохранение состояния гарантирует выполнение именно той его части, которая логически следовала за ожиданием.

Как говорится, лучше 1 раз увидеть, чем 100 раз услышать, поэтому настоятельно рекомендую ознакомиться с примером ниже. Я немного переписал код, улучшил именование переменных и щедро закомментировал.

Исходный код
public static async Task Delays()
{
     Console.WriteLine(1);
     await Task.Delay(1000);
     Console.WriteLine(2);
     await Task.Delay(1000);
     Console.WriteLine(3);
     await Task.Delay(1000);
     Console.WriteLine(4);
     await Task.Delay(1000);
     Console.WriteLine(5);
     await Task.Delay(1000);
}


Метод заглушка
[AsyncStateMachine(typeof(DelaysStateMachine))]
[DebuggerStepThrough]
public Task Delays()
{
     DelaysStateMachine stateMachine = new DelaysStateMachine();
     stateMachine.taskMethodBuilder = AsyncTaskMethodBuilder.Create();
     stateMachine.currentState = -1;
     AsyncTaskMethodBuilder builder = stateMachine.taskMethodBuilder;
     taskMethodBuilder.Start(ref stateMachine);
     return stateMachine.taskMethodBuilder.Task;
}


Машина состояний
[CompilerGenerated]
private sealed class DelaysStateMachine : IAsyncStateMachine
{
    //откражает текущее состояние, то есть на каком await выполняется ожидание
    //так возможно восстановление выполнения метода с любого await'a
    public int currentState; 
    public AsyncTaskMethodBuilder taskMethodBuilder;
    //текущий объект ожидания
    private TaskAwaiter taskAwaiter;

    //все параметры метода, как и локальные переменные сохраняются в поля для сохранения между ожиданиями при "отпускании" потока
    public int paramInt;
    private int localInt;

    private void MoveNext()
    {
        int num = currentState;
        try
        {
            TaskAwaiter awaiter5;
            TaskAwaiter awaiter4;
            TaskAwaiter awaiter3;
            TaskAwaiter awaiter2;
            TaskAwaiter awaiter;
            switch (num)
            {
                default:
                    localInt = paramInt;  //до первого await
                    Console.WriteLine(1);  //до первого await
                    awaiter5 = Task.Delay(1000).GetAwaiter();  //до первого await
                    if (!awaiter5.IsCompleted) // сам await. Проверяется, завершился ли метод
                    {
                        num = (currentState = 0); //обновление состояние, чтобы возобновить с нужного места
                        taskAwaiter = awaiter5; //сохранение из локальной в поле, ведь после возобновления локальные переменные не сохранятся
                        DelaysStateMachine stateMachine = this; //чтобы передать по ссылке
                        taskMethodBuilder.AwaitUnsafeOnCompleted(ref awaiter5, ref stateMachine); // за страшным названием скрывается лишь действия для присоединения к объекту ожидания продолжения в виде этого метода
                        return;
                    }
                    goto Il_AfterFirstAwait; //если метод завершен, переходим к коду, который следовал далее
                case 0: //данное состояние будет лишь когда первая асинхронная операция не завершилась к моменту проверки, то есть метод пошел по асинхронному пути выполнения. Если мы здесь, то первая операция завершена и мы возобновляем выполнение метода
                    awaiter5 = taskAwaiter; //восстанавливаем объект ожидания
                    taskAwaiter = default(TaskAwaiter); //обнуляем поле класса
                    num = (currentState = -1); //обновляем состояние
                    goto Il_AfterFirstAwait; //переходим непосредственно к логике из исходного метода
                case 1: // мы здесь, если вторая операция не завершилась сразу, а было присоеденино продолжение, которое как раз и запустилось.
                    awaiter4 = taskAwaiter;
                    taskAwaiter = default(TaskAwaiter);
                    num = (currentState = -1);
                    goto Il_AfterSecondAwait;
                case 2: // аналогично, третья операция не завершилась сразу.
                    awaiter3 = taskAwaiter;
                    taskAwaiter = default(TaskAwaiter);
                    num = (currentState = -1);
                    goto Il_AfterThirdAwait;
                case 3: // а здесь четвертая
                    awaiter2 = taskAwaiter;
                    taskAwaiter = default(TaskAwaiter);
                    num = (currentState = -1);
                    goto Il_AfterFourthAwait;
                case 4: // ну и пятая
                    {
                        awaiter = taskAwaiter;
                        taskAwaiter = default(TaskAwaiter);
                        num = (currentState = -1);
                        break;
                    }

                    Il_AfterFourthAwait:
                    awaiter2.GetResult();
                    Console.WriteLine(5); //код после четвертой асинхронной операции
                    awaiter = Task.Delay(1000).GetAwaiter(); //пятая асинхронная операция
                    if (!awaiter.IsCompleted)
                    {
                        num = (currentState = 4);
                        taskAwaiter = awaiter;
                        DelaysStateMachine stateMachine = this;
                        taskMethodBuilder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
                        return;
                    }
                    break;

                    Il_AfterFirstAwait: //если мы здесь, то первая операция завершилась так или иначе
                    awaiter5.GetResult(); //соответсвенно результат доступен и мы его получаем
                    Console.WriteLine(2); //выполнение того кода, который шел после первого await
                    awaiter4 = Task.Delay(1000).GetAwaiter(); //Выполнение второй асинхронной операции
                    if (!awaiter4.IsCompleted) 
                    {
                        num = (currentState = 1);
                        taskAwaiter = awaiter4;
                        DelaysStateMachine stateMachine = this;
                        taskMethodBuilder.AwaitUnsafeOnCompleted(ref awaiter4, ref stateMachine);
                        return;
                    }
                    goto Il_AfterSecondAwait;

                    Il_AfterThirdAwait:
                    awaiter3.GetResult();
                    Console.WriteLine(4); //код после третей асинхронной операции
                    awaiter2 = Task.Delay(1000).GetAwaiter(); //четвертая асинхронная операция
                    if (!awaiter2.IsCompleted)
                    {
                        num = (currentState = 3);
                        taskAwaiter = awaiter2;
                        DelaysStateMachine stateMachine = this;
                        taskMethodBuilder.AwaitUnsafeOnCompleted(ref awaiter2, ref stateMachine);
                        return;
                    }
                    goto Il_AfterFourthAwait;

                    Il_AfterSecondAwait:
                    awaiter4.GetResult();
                    Console.WriteLine(3); //код после второй асинхронной операции
                    awaiter3 = Task.Delay(1000).GetAwaiter(); //третья асинхронная операция
                    if (!awaiter3.IsCompleted)
                    {
                        num = (currentState = 2);
                        taskAwaiter = awaiter3;
                        DelaysStateMachine stateMachine = this;
                        taskMethodBuilder.AwaitUnsafeOnCompleted(ref awaiter3, ref stateMachine);
                        return;
                    }
                    goto Il_AfterThirdAwait;
            }
            awaiter.GetResult();
        }
        catch (Exception exception)
        {
            currentState = -2;
            taskMethodBuilder.SetException(exception);
            return;
        }
        currentState = -2;
        taskMethodBuilder.SetResult(); //если бы использовали асинхронные операции, которые возвращают результат, он был бы параметром этого метода
    }

    void IAsyncStateMachine.MoveNext() {...}

    [DebuggerHidden]
    private void SetStateMachine(IAsyncStateMachine stateMachine) {...}

    void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine) {...}
}


Заостряю внимание на фразе «к этому моменту не выполнился синхронно». Асинхронная операция может пойти и по синхронному пути выполнения. Главное условие для того, чтобы текущий асинхронный метод выполнялся синхронно, то есть не меняя поток — это завершенность асинхронной операции на момент проверки IsCompleted.

Этот пример наглядно демонстрирует данное поведение
static async Task Main()
{
    Console.WriteLine(Thread.CurrentThread.ManagedThreadId); //1
    Task task = Task.Delay(1000);
    Thread.Sleep(1700);
    await task;
    Console.WriteLine(Thread.CurrentThread.ManagedThreadId); //1
}


Про контекст синхронизации. Метод AwaitUnsafeOnCompleted, используемый в машине, в конечном счете приводит к вызову метода Task.SetContinuationForAwait. В данном методе происходит получение текущего контекста синхронизации SynchronizationContext.Current. Контекст синхронизации можно трактовать как тип потока. В случае, если он есть и какой-то специфический (например, контекст UI потока), продолжение создается с помощью класса SynchronizationContextAwaitTaskContinuation. Данный класс для запуска продолжения вызывает метод Post на сохраненном контексте, что гарантирует выполнение продолжения именно в том контексте, где был запущен метод. Конкретная логика по выполнению продолжения зависит от метода Post в контексте, который, мягко говоря, не славится быстродействием. В случае, если контекста синхронизации не было(или было указано, что нам неважно, в каком контексте продолжится выполнение с помощью ConfigureAwait(false), который будет рассмотрен в последней главе) то продолжение будет выполнено потоком из пула. Стоит упомянуть, что в ASP NET Core потоки, обрабатывающие запросы теперь не имеют контекста (что не может не радовать, одной весьма частой причиной дедлока меньше).

Истоки асинхронности. Устройство стандартных асинхронных методов


Мы рассмотрели то, как выглядит метод, использующий async и await и что происходит за кулисами. Данная информация нередка. Но важно понимать и природу асинхронных операций. Потому что как мы видели в машине состояний в коде вызываются асинхронные операции, разве что их результат обрабатывается хитрее. Однако что происходит внутри самих асинхронных операций? Вероятно, то же самое, но это не может происходить до бесконечности.

Важной задачей является понимание природы асинхронности. При попытках понять асинхронность наблюдается чередование состояний «теперь понятно» и «теперь снова непонятно». И данное чередование будет до тех пор, пока не станет понятен исток асинхронности.

При работе с асинхронностью мы оперируем задачами. Это совсем не то же самое, что поток. Одна задача может выполняться многими потоками, а один поток может выполнять много задач.

Как правило, асинхронность начинается с метода, который возвращает Task (например), но не помечен async, соответственно не использует await внутри. Такой метод не терпит никаких компиляторных изменений, выполняется как есть.

Итак, рассмотрим несколько корней асинхронности.

  1. Task.Run, new Task(..).Start(), Factory.StartNew и ему подобные. Самый простой способ начать асинхронное выполнение. Данные способы просто создают новый объект задачи, передавая в качестве одного из параметров делегат. Задача передается планировщику, который дает ее на выполнение одному из потоков пула. Возвращается готовая задача, которую можно ожидать. Как правило, такой подход используется для начала вычислений (CPU-bound) в отдельном потоке
  2. TaskCompletionSource. Вспомогательный класс, который помогает контролировать объект задачи. Создан для тех, кто не может выделить делегат под выполнение и использует более сложные механизмы контроля завершенности. Имеет очень простое API — SetResult, SetError и тд, которые соответствующим образом обновляют задачу. Данная задача доступна через свойство Task. Возможно, внутри вы будете создавать потоки, иметь сложную логику по их взаимодействию или завершение по событию. Чуть больше деталей об этом классе будет в последнем разделе

В дополнительный пункт можно вынести методы стандартных библиотек. Сюда можно отнести чтение/запись файлов, работу с сетью и тому подобное. Как правило, такие популярные и распространенные методы используют системные вызовы, которые различаются на разных платформах, а их устройство крайне занятно. Рассмотрим работы с файлами и сетью.

Файлы


Важное замечание — при желании работать с файлами асинхронно требуется указать при создании FileStream useAsync = true.

В файлах все устроено нетривиально и запутанно. Класс FileStream объявлен как partial. И помимо него существует еще 6 partial дополнений, зависящих от платформы. Так, в Unix для асинхронного доступа в произвольный файл, как правило, используется синхронная операция в отдельном потоке. В Windows существуют системные вызовы для асинхронной работы, которые, разумеется, используются. Это приводит к различиям в работе на разных платформах. Исходники.

Unix

Стандартное поведение при записи или чтении — производить операцию синхронно, если буфер позволяет и стрим не занят другой операцией:

1. Стрим не занят другой операцией

В классе Filestream есть объект, унаследованный от SemaphoreSlim параметрами (1, 1) — то есть а-ля критическая секция — фрагмент кода, защищенный этим семафором, может выполнятся в одно время только одним потоком. Данный семафор используется как при чтении, так и при записи. То есть невозможно одновременно производить и чтение, и запись. При этом блокировки на семафоре не происходит. На нем вызывается метод this._asyncState.WaitAsync(), который возвращает объект задачи (блокировки или ожидания при этом нет, оно было бы, если бы к результату метода применили ключевое слово await). Если данный объект задачи не завершен — то есть семафор захвачен, то к возвращенному объекту ожидания присоединяется продолжение (Task.ContinueWith), в котором выполняется операция. Если же объект свободен, то нужно проверить следующее

2. Буфер позволяет

Тут уже поведение зависит от характера операции.

Для записи — проверяется, чтобы размер данных для записи + позиция в файле были меньше, чем размер буфера, который по умолчанию — 4096 байт. То есть мы должны писать 4096 байт с начала, 2048 байт со смещением в 2048 и тд. Если это так, то операция проводится синхронно, в противном случае присоединяется продолжение (Task.ContinueWith). В продолжении используется обычный синхронный системный вызов. При заполнении буфера он синхронно пишется на диск.
Для чтения — проверяется, достаточно ли данных в буфере для того, чтобы вернуть все необходимые данные. Если нет, то, опять же, продолжение (Task.ContinueWith) с синхронным системным вызовом.

Кстати, тут есть интересная деталь. В случае, если одна порция данных займет весь буфер, они будут записаны напрямую в файл, без участия буфера. При этом, есть ситуация, когда данных будет больше, чем размер буфера, но они все пройдут через него. Такое случается, если в буфере уже есть что-то. Тогда наши данные разделятся на 2 порции, одна заполнит буфер до конца и данные запишутся в файл, вторая будет записана в буфер, если влазит в него или напрямую в файл, если не влазит. Так, если мы создадим стрим и запишем в него 4097 байт то они сразу появятся в файле, без вызова Dispose. Если же мы запишем 4095, то в файле ничего не будет.

Windows

Под Windows алгоритм использования буфера и записи напрямую очень похож. Но существенное различие наблюдается в непосредственно в асинхронных системных вызовах записи и чтения. Если говорить без углубления в системные вызовы, то существует такая структура Overlapped. В ней есть важное нам поле — HANDLE hEvent. Это событие c ручным сбросом, которое переходит в сигнальное состояние по завершении операции. Возвращаемся к реализации. Запись напрямую, как и запись буфера используют асинхронные системные вызовы, которые используют вышеупомянутую структуру как параметр. При записи создается объект FileStreamCompletionSource — наследник TaskCompletionSource, в котором как раз указан IOCallback. Он вызывается свободным потоком из пула, когда операция завершается. В колбэке структура Overlapped разбирается и соответствующим образом обновляется объект Task. Вот и вся магия.

Сеть


Сложно описать все, что я увидел разбираясь в исходниках. Мой путь лежал от HttpClient до Socket и до SocketAsyncContext для Unix. Общая схема такая же, как и с файлами. Для Windows используется упомянутая структура Overlapped и операция выполняется асинхронно. В Unix операции с сетью также используют функции обратного вызова.

И небольшое пояснение. Внимательный читатель заметит, что при использовании асинхронных вызовов между вызовом и колбэком некая пустота, которая каким-то образом работает с данными. Здесь стоит дать уточнение для полноты картины. На примере файлов — непосредственно вычислительными операции с диском производит контроллер диска, именно он дает сигналы о перемещении головок на нужный сектор и тд. Процессор же в это время свободен. Общение с диском происходит посредством портов ввода/вывода. В них указывается тип операции, расположение данных на диске и тд. Далее контроллер и диск занимаются выполнением этой операции и по завершению работы они генерируют прерывание. Соответственно, асинхронный системный вызов только вносит информацию в порты ввода/вывода, а синхронный еще и дожидается результатов, переводя поток в состояние блокировки. Данная схема не претендует на абсолютную точность (не об этом статья), но дает концептуальное понимание работы.

Теперь ясна природа процесса. Но у кого-то может возникнуть вопрос, а что делать с асинхронностью? Ведь невозможно вечно писать async над методом.

Во-первых. Приложение может быть сделано как служба. При этом точка входа — Main — пишется с нуля вами. До недавних пор Main не мог быть асинхронным, в 7 версии языка добавили такую возможность. Но ничего коренным образом оно не меняет, просто компилятор генерирует обычный Main, а из асинхронного делается просто статический метод, который вызывается в Main и синхронно ожидается его завершение. Итак, вероятнее всего у вас есть какие-то долгоживущие действия. Почему-то в этот момент многие начинают раздумывать как создавать потоков под это дело: через Task, ThreadPool или вообще Thread вручную, ведь в чем-то разница должна быть. Ответ прост — разумеется Task. Если вы используете подход TAP, то не надо мешать его с созданием потоков вручную. Это сродни использования HttpClient для почти всех запросов, а POST осуществлять самостоятельно через Socket.

Во-вторых. Веб приложения. Каждый поступающий запрос порождает вытягивание нового потока из ThreadPool для обработки. Пул, конечно, большой, но не бесконечный. В случае, когда запросов много, потоков на всех может не хватать, и все новые запросы будут ставиться в очередь на обработку. Такая ситуация называется голоданием. Но в случае использования асинхронных контроллеров, как обсуждалось ранее, поток возвращается в пул и может быть использован для обработки новых запросов. Таким образом существенно увеличивается пропускная способность сервера.

Мы рассмотрели асинхронный процесс с самого начала и до самого конца. И вооружившись пониманием всей этой асинхронности, противоречащей человеческой натуре, рассмотрим некоторые полезные приемы при работе с асинхронным кодом.

Полезные классы и приемы при работе с TAP


Статическое многообразие класса Task.


У класса Task есть несколько полезных статических методов. Ниже будут приведены основные из них.

  1. Task.WhenAny(..) — комбинатор, принимает IEnumerable/params объектов задач и возвращает объект задачи, который завершиться по завершении первой завершившейся из переданных задач. То есть позволяет дожидаться одной из нескольких запущенных задач
  2. Task.WhenAll(..) — комбинатор, принимает IEnumerable/params объектов задач и возвращает объект задачи, который завершиться по завершении всех переданных задач
  3. Task.FromResult<T>(T value) — возвращает это же значение, обернутое в завершенную задачу. Зачастую требуется при реализации существующих интерфейсов с асинхронными методами
  4. Task.Delay(..) — асинхронно ждет указанное время
  5. Task.Yield() — планирует продолжение. Как упоминалось выше, асинхронный метод может закончится и синхронно. В случае, если вызван этот метод, его продолжение будет выполнено асинхронно

ConfigureAwait


Естественно, самая популярная «продвинутая» особенность. Данный метод принадлежит классу Task и позволяет указать, необходимо ли нам выполнять продолжение в том же контексте, где была вызвана асинхронная операция. По умолчанию, без использования этого метода, контекст запоминается и продолжение ведется в нем с помощью упомянутого метода Post. Однако, как мы говорили, Post — весьма дорогое удовольствие. Поэтому, если производительность на 1-м месте, а мы видим, что продолжение не будет, скажем, обновлять UI, можно указать на объекте ожидания .ConfigureAwait(false). Это означает, что нам БЕЗРАЗЛИЧНО, где будет выполнено продолжение.

Теперь о проблеме. Как говорится страшно не незнание, а ложное знание.

Как-то довелось наблюдать код веб-приложения, где каждый асинхронный вызов был украшен сие ускорителем. Это не имеет никакого эффекта, кроме визуального отвращения. Стандартное веб-приложение ASP.NET Core не имеет каких-то уникальных контекстов (если вы их сами не напишете, конечно). Таким образом, метод Post там и так не вызывается.

TaskCompletionSource<T>


Класс, позволяющий легко управлять объектом Task. Класс имеет широкие возможности, но наиболее полезен, когда мы хотим обернуть в задачу некоторое действие, конец которого происходит по событию. Вообще, класс был создан для адаптации старых асинхронных методов под TAP, но как мы видели, используется он не только для этого. Небольшой пример работы с данным классом:

Пример
public static Task<string> GetSomeDataAsync()
{
    TaskCompletionSource<string> tcs = new TaskCompletionSource<string>();
    FileSystemWatcher watcher = new FileSystemWatcher
    {
        Path = Directory.GetCurrentDirectory(),
        NotifyFilter = NotifyFilters.LastAccess,
        EnableRaisingEvents = true
    };
    watcher.Changed += (o, e) => tcs.SetResult(e.FullPath);
    return tcs.Task;
}


Данный класс создает асинхронную обертку для получения имени файла, к которому в текущей папке производился доступ.

CancellationTokenSource


Позволяет отменить асинхронную операцию. Общая схема напоминает использование TaskCompletionSource. Сначала создается var cts = new CancellationTokenSource(), который, кстати, IDisposable, затем в асинхронные операции передается cts.Token. Далее, следуя какой-то вашей логике, при определенных условиях вызывается метод cts.Cancel(). Это также может подписка на событие или что угодно другое.

Использование CancellationToken является хорошей практикой. При написании своего асинхронного метода, который делает некоторую работу в фоне, допустим в бесконечном while, можно просто вставить одну строку в тело цикла: cancellationToken.ThrowIfCancellationRequested(), которая выброит исключение OperationCanceledException. Это исключение трактуется как отмена операции и не сохраняется как исключение внутри объекта задачи. Также Свойство IsCanceled на объекте Task станет true.

LongRunning


Зачастую случаются ситуации, особенно при написании служб, когда вы создаете несколько задач, которые будут работать на протяжении всей жизни службы или просто весьма долго. Как мы помним, использование пула потоков обоснованно накладными расходами на создание потока. Однако если поток создается редко (да даже раз в час), то данные расходы нивелируются и можно смело создать отдельные потоки. Для этого при создании задачи можно указать специальную опцию:

Task.Factory.StartNew(action, TaskCreationOptions.LongRunning)

Да и вообще советую посмотреть на все перегрузки Task.Factory.StartNew, там есть много способов гибко настроить выполнение задачи под конкретные нужды.

Исключения


В связи с недетерминированной природой выполнения асинхронного кода вопрос об исключениях является очень актуальным. Было бы обидно, если бы вы не могли перехватить исключение и оно выбрасывалось в левом потоке, убивая процесс. Для перехвата исключения в одном потоке и возбуждения его в создан класс ExceptionDispatchInfo. Чтобы захватить исключение, используется статический метод ExceptionDispatchInfo.Capture(ex), возвращающий ExceptionDispatchInfo. Ссылку на этот объект можно передать в любой поток, который затем вызовет метод Throw() для его выброса. Сам выброс происходит НЕ в месте вызова асинхронной операции, а в месте использования оператора await (если метод помечен как async, в противном случае компилятор не будет совершать преобразований, а значит метод работает как простой метод — исключения никто перехватывать и перевыбрасывать не будет). А как известно, await применить к void нельзя. Таким образом, исключение возбудится в не подконтрольном нам потоке и словлено не будет. А это почти 100% приведет к краху приложения (всегда существуют грязные хаки). И тут мы приходим к практике того, что мы должны использовать Task или Task<T>, но не void.

И еще. У планировщика есть событие TaskScheduler.UnobservedTaskException, которое срабатывает, когда выбрасывается UnobservedTaskException. Это исключение выбрасывается при сборке мусора, когда GC пытается собрать объект задачи, в котором имеется необработанное исключение.

IAsyncEnumerable


До C# 8 и .NET Core 3.0 не было возможности использовать блок-итератор (yield) в асинхронном методе, что усложняло жизнь и заставляло из такого метода возвращать Task<IEnumerable<T>>, т.е. не было способа проитерироваться по коллекции до полного ее получения. Теперь такая возможность есть. Подробнее о ней можно узнать здесь. Для этого тип возвращаемого значения должен быть IAsyncEnumerable<T> (или IAsyncEnumerator<T>). Для прохода по такой коллекции следует использовать цикл foreach с ключевым словом await. Также на результате операции могут быть вызваны методы WithCancellation и ConfigureAwait, указывающие используемый CancelationToken и необходимость продолжения в том же контексте.

Как и положено, все выполняется настолько лениво, насколько это возможно.
Ниже представлен пример и вывод, который он дает.

Пример
public class Program
{
    public static async Task Main()
    {
        Stopwatch sw = new Stopwatch();
        sw.Start();
        IAsyncEnumerable<int> enumerable = AsyncYielding();
        Console.WriteLine($"Time after calling: {sw.ElapsedMilliseconds}");

        await foreach (var element in enumerable.WithCancellation(..).ConfigureAwait(false))
        {
            Console.WriteLine($"element: {element}");
            Console.WriteLine($"Time: {sw.ElapsedMilliseconds}");
        }
    }

    static async IAsyncEnumerable<int> AsyncYielding()
    {
        foreach (var uselessElement in Enumerable.Range(1, 3))
        {
            Task task = Task.Delay(TimeSpan.FromSeconds(uselessElement));
            Console.WriteLine($"Task run: {uselessElement}");
            await task;
            yield return uselessElement;
        }
    }
}


Вывод:

Time after calling: 0
Task run: 1
element: 1
Time: 1033
Task run: 2
element: 2
Time: 3034
Task run: 3
element: 3
Time: 6035


ThreadPool


Данный класс активно используется при программировании с TAP. Поэтому дам минимальные подробности его реализации. Внутри себя ThreadPool имеет массив очередей: по одной на каждый поток + одна глобальная. При добавлении новой работы в пул учитывается поток, который инициировал добавление. В случае, если это поток из пула, работа ставится в собственную очередь этого потока, если это был другой поток — в глобальную. Когда потоку выбирается работа, сначала смотрится его локальная очередь. Если она пуста, поток берет задания из глобальной. Если же и та пуста — начинает красть у остальных. Также никогда не стоит полагаться на порядок выполнения работ, потому что, собственно, порядка то и нет. Количество потоков в пуле по умолчанию зависит от множества факторов, включая размер адресного пространства. Если запросов на выполнение больше, чем количество доступных потоков, запросы ставятся в очередь.

Потоки в пуле потоков — background-потоки (свойство isBackground = true). Данный вид потоков не поддерживает жизнь процесса, если все foreground-потоки завершились.

Системный поток наблюдает за состоянием wait handle. Когда операция ожидания заканчивается, переданный колбэк выполняется потоком из пула (вспоминаем файлы в Windows).

Task-like тип


Упомянутый ранее, данный тип (структура или класс) может быть использован в качесве возвращаемого значения из асинхронного метода. С этим типом должен быть связан тип билдера с помощью атрибута [AsyncMethodBuilder(..)]. Данный тип должен обладать упомянутыми ранее характеристиками для возможности применять к нему ключевое слово await. Может быть непараметризированным для методов не возвращающих значение и параметризированным — для тех, которые возвращают.

Сам билдер — класс или структура, каркас которой показан в примере ниже. Метод SetResult имеет параметр типа T для task-like типа, параметризированного T. Для непараметризированных типов метод не имеет параметров.

Необходимый интерфейс билдера
class MyTaskMethodBuilder<T>
{
    public static MyTaskMethodBuilder<T> Create();

    public void Start<TStateMachine>(ref TStateMachine stateMachine)
        where TStateMachine : IAsyncStateMachine;

    public void SetStateMachine(IAsyncStateMachine stateMachine);
    public void SetException(Exception exception);
    public void SetResult(T result);

    public void AwaitOnCompleted<TAwaiter, TStateMachine>(
        ref TAwaiter awaiter, ref TStateMachine stateMachine)
        where TAwaiter : INotifyCompletion
        where TStateMachine : IAsyncStateMachine;
    public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(
        ref TAwaiter awaiter, ref TStateMachine stateMachine)
        where TAwaiter : ICriticalNotifyCompletion
        where TStateMachine : IAsyncStateMachine;

    public MyTask<T> Task { get; }
}


Далее будет описан принцип работы с точки зрения пишущего свой Task-like тип. Большинство это уже было описано при разборе кода, сгенерированного компилятором.

Все эти типы компилятор использует для генерации машины состояний. Компилятор знает, какие билдеры использовать для известных ему типов, здесь же мы сами указываем, что будет использовано при кодогенерации. Если машина состояний — структура, то произойдет ее упаковка при вызове SetStateMachine, билдер может закэшировать упакованную копию при необходимости. Билдер должен вызвать stateMachine.MoveNext в методе Start или после его вызова, чтобы начать выполнение и продвинуть машину состояний. После вызова Start, из метода будет возвращено значение свойство Task. Рекомендую вернуться к методу заглушке и просмотреть эти действия.

Если машина состояний успешно отрабатывает, вызывается метод SetResult, иначе SetException. Если машина состояний достигает await, выполняется метод GetAwaiter() task-like типа. Если объект ожидания реализует интерфейс ICriticalNotifyCompletion и IsCompleted = false, машина состояний использует builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine). Метод AwaitUnsafeOnCompleted должен вызвать awaiter.OnCompleted(action), в action должен быть вызов stateMachine.MoveNext когда объект ожидания завершится. Аналогично для интерфейса INotifyCompletion и метода builder.AwaitOnCompleted.

Как использовать это — решать вам. Но советую подумать раз этак 514 прежде чем применить это в продакшене, а не для баловства. Ниже приведен пример использования. Я набросал всего-лишь прокси для стандартного билдера, который выводит на консоль, какой метод был вызван и в какое время. Кстати, асинхронный Main() не хочет поддерживать кастомный тип ожидания (полагаю не один продакшен проект был безнадежно испорчен из-за этого промаха Microsoft). При желании, вы можете модифицировать прокси-логер, использовав нормальный логер и логируя больше информации.

Логирующий прокси-таск
public class Program
{
    public static void Main()
    {
        Console.WriteLine("Start");
        JustMethod().Task.Wait(); //не надо так
        Console.WriteLine("Stop");
    }

    public static async LogTask JustMethod()
    {
        await DelayWrapper(1000);
    }

    public static LogTask DelayWrapper(int milliseconds) => new LogTask { Task = Task.Delay(milliseconds)};
}

[AsyncMethodBuilder(typeof(LogMethodBuilder))]
public class LogTask
{
    public Task Task { get; set; }

    public TaskAwaiter GetAwaiter() => Task.GetAwaiter();
}

public class LogMethodBuilder
{
    private AsyncTaskMethodBuilder _methodBuilder = AsyncTaskMethodBuilder.Create();
    private LogTask _task;
    
    public static LogMethodBuilder Create()
    {
        Console.WriteLine($"Method: Create; {DateTime.Now :O}");
        return new LogMethodBuilder();
    }
    
    public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
    {
        Console.WriteLine($"Method: Start; {DateTime.Now :O}");
        _methodBuilder.Start(ref stateMachine);
    }
    
    public void SetStateMachine(IAsyncStateMachine stateMachine)
    {
        Console.WriteLine($"Method: SetStateMachine; {DateTime.Now :O}");
        _methodBuilder.SetStateMachine(stateMachine);
    }
    
    public void SetException(Exception exception)
    {
        Console.WriteLine($"Method: SetException; {DateTime.Now :O}");
        _methodBuilder.SetException(exception);
    }
    
    public void SetResult()
    {
        Console.WriteLine($"Method: SetResult; {DateTime.Now :O}");
        _methodBuilder.SetResult();
    }    

    public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
        where TAwaiter : INotifyCompletion
        where TStateMachine : IAsyncStateMachine
    {
        Console.WriteLine($"Method: AwaitOnCompleted; {DateTime.Now :O}");
        _methodBuilder.AwaitOnCompleted(ref awaiter, ref stateMachine);
    }

    public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
        where TAwaiter : ICriticalNotifyCompletion
        where TStateMachine : IAsyncStateMachine
    {
        Console.WriteLine($"Method: AwaitUnsafeOnCompleted; {DateTime.Now :O}");
        _methodBuilder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
    }

    public LogTask Task
    {
        get
        {
            Console.WriteLine($"Property: Task; {DateTime.Now :O}");
            return _task ??= new LogTask {Task = _methodBuilder.Task};
        }
        set => _task = value;
    }
}


Вывод:

Start
Method: Create; 2019-10-09T17:55:13.7152733+03:00
Method: Start; 2019-10-09T17:55:13.7262226+03:00
Method: AwaitUnsafeOnCompleted; 2019-10-09T17:55:13.7275206+03:00
Property: Task; 2019-10-09T17:55:13.7292005+03:00
Method: SetResult; 2019-10-09T17:55:14.7297967+03:00
Stop


На этом все, всем спасибо.
Теги:
Хабы:
Всего голосов 34: ↑32 и ↓2 +30
Просмотры 98K
Комментарии Комментарии 20