Pull to refresh

Как на самом деле работает Async/Await в C# (Часть 1)

Level of difficultyHard
Reading time12 min
Views58K
Original author: Stephen Toub

Так как оригинальная статья довольно объемная, я взял на себя смелость разбить ее на несколько независимых частей, более легких для перевода и восприятия. 

Disclaimer: Я не являюсь профессиональным переводчиком, перевод подготовлен скорее для себя и коллег. Я буду благодарен за любые исправления и помощь в переводе, статья очень интересная давайте сделаем её доступной на русском языке.

  1. Часть 1: В самом начале…

  2. Часть 2: Асинхронная модель на основе событий (EAP)

  3. Часть 3: Появление Tasks (Асинхронная модель на основе задач (TAP)

  4. Часть 4: ...и ValueTasks

  5. Часть 5: Итераторы C# в помощь

  6. Часть 6: Async/await: Внутреннее устройство

  7. Часть 7: SynchronizationContext и ConfigureAwait и поля в State Machine

Несколько недель назад в блоге «.NET Blog» появилась статья «Что такое .NET, и почему вы должны выбрать его?» В нем был представлен высокоуровневый обзор платформы, кратко описаны различные компоненты и архитектурные решения, а также обещаны более подробные посты по затронутым темам. Этот пост является первым таким продолжением, в котором подробно рассматривается история создания, архитектурные решения и детали реализации async/await в C# и .NET.

Поддержка async/await существует уже более десяти лет. За это время она изменила способ написания масштабируемого кода для .NET, и использование этой функциональности без понимания того, что именно происходит под капотом, является работоспособным и чрезвычайно распространенным способом работы с ней. Вы начинаете с синхронного метода, подобного следующему (этот метод является «синхронным», потому что вызывающая сторона не сможет делать ничего другого, пока вся операция не завершится и управление не будет возвращено вызывающей стороне):

// Synchronously copy all data from source to destination.
public void CopyStreamToStream(Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int numRead;
    while ((numRead = source.Read(buffer, 0, buffer.Length)) != 0)
    {
        destination.Write(buffer, 0, numRead);
    }
}

Затем вы добавляете несколько ключевых слов, изменяете несколько имен методов, и в итоге получаете следующий асинхронный метод (этот метод является «асинхронным», потому что ожидается, что управление будет возвращено вызывающей стороне очень быстро и, возможно, до завершения работы, связанной со всей операцией):

// Asynchronously copy all data from source to destination.
public async Task CopyStreamToStreamAsync(Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int numRead;
    while ((numRead = await source.ReadAsync(buffer, 0, buffer.Length)) != 0)
    {
        await destination.WriteAsync(buffer, 0, numRead);
    }
}

Почти идентичный синтаксис, возможность использовать все те же конструкции потока управления, но теперь неблокирующий по своей сути, со значительно отличающейся основной моделью выполнения, и со всей тяжелой работой, которую за вас выполняют компилятор C# и основные библиотеки.

Хотя обычно можно использовать эту поддержку, не зная точно, что происходит под капотом, я твердо убежден, что понимание того, как что-то работает, поможет вам использовать это еще лучше. В частности, для async/await понимание механизмов работы особенно полезно, когда вы хотите заглянуть под поверхность, например, когда вы пытаетесь отладить то, что пошло не так, или улучшить производительность того, что в остальном пошло правильно. В этом посте мы подробно рассмотрим, как именно работает await на уровне языка, компилятора и библиотеки, чтобы вы могли максимально использовать эти ценные возможности.

В самом начале...

Еще в .NET Framework 1.0 появился шаблон модели асинхронного программирования, известный как шаблон APM, известный как шаблон Begin/End, известный как шаблон IAsyncResult. На высоком уровне этот паттерн прост. Для синхронной операции DoStuff:

class Handler
{
    public int DoStuff(string arg);
}

в шаблоне будет два соответствующих метода: метод BeginDoStuff и метод EndDoStuff:

class Handler
{
    public int DoStuff(string arg);

    public IAsyncResult BeginDoStuff(string arg, AsyncCallback? callback, object? state);
    public int EndDoStuff(IAsyncResult asyncResult);
}

BeginDoStuff принимает все те же параметры, что и DoStuff, но дополнительно принимает делегат AsyncCallback и неявный object состояния, один или оба из которых могут быть null. Метод Begin отвечал за инициирование асинхронной операции, и если ему предоставлялся обратный вызов (часто называемый «продолжением» для начальной операции), он также отвечал за то, чтобы обратный вызов был вызван по завершении асинхронной операции. Метод Begin также создает экземпляр типа, реализованного в IAsyncResult, используя необязательное state для заполнения свойства AsyncState этого IAsyncResult:

namespace System
{
    public interface IAsyncResult
    {
        object? AsyncState { get; }
        WaitHandle AsyncWaitHandle { get; }
        bool IsCompleted { get; }
        bool CompletedSynchronously { get; }
    }

    public delegate void AsyncCallback(IAsyncResult ar);
}

Этот экземпляр IAsyncResult будет возвращен из метода Begin, а также передан в AsyncCallback, когда он будет вызван. Когда пользователь готов получить результаты операции, он передает экземпляр IAsyncResult в метод End, который отвечает за обеспечение завершения операции (синхронное ожидание ее завершения и блокирование в противном случае), а затем возвращает любой результат операции, включая распространение любых ошибок/исключений, которые могли возникнуть. Таким образом, вместо того чтобы написать код, подобный следующему, для синхронного выполнения операции:

try
{
    int i = handler.DoStuff(arg); 
    Use(i);
}
catch (Exception e)
{
    ... // handle exceptions from DoStuff and Use
}

методы Begin/End могут быть использованы следующим образом для выполнения той же операции асинхронно:

try
{
    handler.BeginDoStuff(arg, iar =>
    {
        try
        {
            Handler handler = (Handler)iar.AsyncState!;
            int i = handler.EndDoStuff(iar);
            Use(i);
        }
        catch (Exception e2)
        {
            ... // handle exceptions from EndDoStuff and Use
        }
    }, handler);
}
catch (Exception e)
{
    ... // handle exceptions thrown from the synchronous call to BeginDoStuff
}

Для всех, кто имел дело с API на основе обратного вызова в любом языке, это должно показаться знакомым.

Однако дальше все становится только сложнее. Например, существует проблема «погружения стека». Погружение стека - это когда код неоднократно выполняет вызовы, которые все глубже и глубже погружаются в стек, до такой степени, что это может привести к переполнению стека. Метод Begin может вызывать обратный вызов синхронно, если операция завершается синхронно, то есть вызов Begin может сам непосредственно вызывать обратный вызов.

А «асинхронные» операции, которые завершаются синхронно, на самом деле очень распространены; они не являются «асинхронными», потому что им гарантировано асинхронное завершение, а скорее просто разрешено. Например, рассмотрим асинхронное чтение из какой-либо сетевой операции, например, получение из сокета. Если вам нужно только небольшое количество данных для каждой отдельной операции, например, чтение некоторых заголовочных данных из ответа, вы можете установить буфер, чтобы избежать накладных расходов на множество системных вызовов. Вместо того, чтобы выполнять небольшое чтение только того объема данных, который вам нужен немедленно, вы выполняете более крупное чтение в буфер и затем потребляете данные из этого буфера, пока он не исчерпается; это позволяет вам сократить количество дорогостоящих системных вызовов, необходимых для реального взаимодействия с сокетом. Такой буфер может существовать за асинхронной абстракцией, которую вы используете, так что первая «асинхронная» операция, которую вы выполняете (заполнение буфера), завершается асинхронно, но все последующие операции до исчерпания базового буфера на самом деле не требуют ввода-вывода, а просто извлекаются из буфера, и поэтому могут завершаться синхронно.

Когда метод Begin выполняет одну из этих операций и обнаруживает, что она завершается синхронно, он может синхронно вызвать обратный вызов. Это означает, что у вас есть один стековый кадр, который вызвал метод Begin, другой стековый кадр для самого метода Begin, а теперь еще один стековый кадр для обратного вызова. Что произойдет, если этот обратный вызов развернется и снова вызовет Begin? Если эта операция завершится синхронно и обратный вызов будет вызван синхронно, то вы снова окажетесь на несколько кадров в глубине стека. И так далее, и так далее, пока в конце концов стек не закончится.

Это реальная возможность, которую легко воспроизвести. Попробуйте эту программу на .NET Core:

using System.Net;
using System.Net.Sockets;

using Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(IPAddress.Loopback, 0));
listener.Listen();

using Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
client.Connect(listener.LocalEndPoint!);

using Socket server = listener.Accept();
_ = server.SendAsync(new byte[100_000]);

var mres = new ManualResetEventSlim();
byte[] buffer = new byte[1];

var stream = new NetworkStream(client);

void ReadAgain()
{
    stream.BeginRead(buffer, 0, 1, iar =>
    {
        if (stream.EndRead(iar) != 0)
        {
            ReadAgain(); // uh oh!
        }
        else
        {
            mres.Set();
        }
    }, null);
};
ReadAgain();

mres.Wait();

Здесь я установил простой клиентский сокет и серверный сокет, соединенные друг с другом. Сервер посылает 100 000 байт клиенту, который затем использует BeginRead/EndRead для их «асинхронного» потребления по одному за раз (это ужасно неэффективно и делается только во имя педагогики). Обратный вызов, переданный BeginRead, завершает чтение вызовом EndRead, а затем, если он успешно прочитал нужный байт (в этом случае он еще не был в конце потока), он выдает еще один BeginRead через рекурсивный вызов локальной функции ReadAgain. Однако в .NET Core операции с сокетами выполняются намного быстрее, чем в .NET Framework, и завершаются синхронно, если ОС способна синхронно удовлетворить операцию (отметим, что в самом ядре есть буфер, используемый для удовлетворения операций приема сокетов). Таким образом, этот стек переполняется:

Поэтому компенсация за это была встроена в модель APM. Существует два возможных способа компенсации:

  1. Не позволяйте AsyncCallback вызываться синхронно. Если он всегда вызывается асинхронно, даже если операция завершается синхронно, то риск погружения в стек исчезает. Но и производительность тоже, потому что операции, которые завершаются синхронно (или настолько быстро, что их невозможно отличить), встречаются очень часто, и принуждение каждого из них ставить в очередь свой обратный вызов добавляет ощутимые накладные расходы.

  2. Используйте механизм, который позволяет вызывающему методу, а не обратному вызову выполнять работу по продолжению, если операция завершается синхронно. Таким образом, вы избегаете лишнего фрейма метода и продолжаете выполнять последующую работу не глубже стека.

Шаблон APM придерживается варианта (2). Для этого интерфейс IAsyncResult раскрывает два связанных, но разных свойства: IsCompleted и CompletedSynchronously. IsCompleted сообщает вам, завершилась ли операция: вы можете проверять его несколько раз, и в конце концов он перейдет из false в true и останется там. В отличие от этого, CompletedSynchronously никогда не изменяется (или если изменяется, то это неприятная ошибка, ожидающая своего часа); он используется для связи между вызывающим метод Begin и AsyncCallback, который отвечает за выполнение любой работы по продолжению.

Если CompletedSynchronously равно false, то операция завершается асинхронно, и любая работа по продолжению в ответ на завершение операции должна быть возложена на обратный вызов; в конце концов, если работа не завершилась синхронно, то вызывающий Begin не может ее обработать, поскольку еще не известно, что операция завершена (а если бы он просто вызвал End, то блокировал бы выполнение операции до ее завершения).

Однако, если CompletedSynchronously равен true, то если обратный вызов будет обрабатывать работу по продолжению, он рискует оказаться в стеке, поскольку будет выполнять эту работу по продолжению глубже в стеке, чем в начале.

Таким образом, любые реализации, обеспокоенные подобными стековыми погружениями, должны проверять CompletedSynchronously и заставлять вызывающий метод Begin выполнять работу по продолжению, если оно истинно, что означает, что обратный вызов не должен выполнять работу по продолжению. Именно поэтому CompletedSynchronously никогда не должен меняться: вызывающая сторона и обратный вызов должны видеть одно и то же значение, чтобы гарантировать, что работа по продолжению будет выполнена один и только один раз, независимо от условий гонки.

В нашем предыдущем примере DoStuff это приводит к такому коду:

try
{
    IAsyncResult ar = handler.BeginDoStuff(arg, iar =>
    {
        if (!iar.CompletedSynchronously)
        {
            try
            {
                Handler handler = (Handler)iar.AsyncState!;
                int i = handler.EndDoStuff(iar);
                Use(i);
            }
            catch (Exception e2)
            {
                ... // handle exceptions from EndDoStuff and Use
            }
        }
    }, handler);
    if (ar.CompletedSynchronously)
    {
        int i = handler.EndDoStuff(ar);
        Use(i);
    }
}
catch (Exception e)
{
    ... // handle exceptions that emerge synchronously from BeginDoStuff and possibly EndDoStuff/Use
}

Это очень много. И до сих пор мы рассматривали только потребление паттерна... мы не рассматривали реализацию паттерна. Хотя большинству разработчиков не нужно заботиться об операциях листа (например, о реализации фактических методов Socket.BeginReceive/EndReceive, которые взаимодействуют с операционной системой), многим, очень многим разработчикам нужно заботиться о композиции этих операций (выполнение нескольких асинхронных операций, которые вместе образуют более крупную операцию), что означает не только потребление других методов Begin/End, но и реализацию их самостоятельно, чтобы ваша композиция могла быть использована в другом месте. И, заметьте, в моем предыдущем примере DoStuff не было потока управления. Если ввести сюда несколько операций, особенно с простым потоком управления, таким как цикл, то внезапно это станет уделом экспертов, которые наслаждаются муками, или авторов постов в блогах, пытающихся донести свою точку зрения.

Поэтому, чтобы донести эту мысль до читателя, давайте реализуем полный пример. В начале этого поста я показал метод CopyStreamToStream, который копирует все данные из одного потока в другой (а-ля Stream.CopyTo, но для пояснения предположим, что такого метода не существует):

public void CopyStreamToStream(Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int numRead;
    while ((numRead = source.Read(buffer, 0, buffer.Length)) != 0)
    {
        destination.Write(buffer, 0, numRead);
    }
}

Все просто: мы многократно читаем из одного потока, затем записываем полученные данные в другой, читаем из одного потока и записываем в другой, и так далее, пока у нас больше нет данных для чтения. Теперь, как бы мы реализовали это асинхронно, используя паттерн APM?

Примерно так:

public IAsyncResult BeginCopyStreamToStream(
    Stream source, Stream destination,
    AsyncCallback callback, object state)
{
    var ar = new MyAsyncResult(state);
    var buffer = new byte[0x1000];

    Action<IAsyncResult?> readWriteLoop = null!;
    readWriteLoop = iar =>
    {
        try
        {
            for (bool isRead = iar == null; ; isRead = !isRead)
            {
                if (isRead)
                {
                    iar = source.BeginRead(buffer, 0, buffer.Length, static readResult =>
                    {
                        if (!readResult.CompletedSynchronously)
                        {
                            ((Action<IAsyncResult?>)readResult.AsyncState!)(readResult);
                        }
                    }, readWriteLoop);

                    if (!iar.CompletedSynchronously)
                    {
                        return;
                    }
                }
                else
                {
                    int numRead = source.EndRead(iar!);
                    if (numRead == 0)
                    {
                        ar.Complete(null);
                        callback?.Invoke(ar);
                        return;
                    }

                    iar = destination.BeginWrite(buffer, 0, numRead, writeResult =>
                    {
                        if (!writeResult.CompletedSynchronously)
                        {
                            try
                            {
                                destination.EndWrite(writeResult);
                                readWriteLoop(null);
                            }
                            catch (Exception e2)
                            {
                                ar.Complete(e);
                                callback?.Invoke(ar);
                            }
                        }
                    }, null);

                    if (!iar.CompletedSynchronously)
                    {
                        return;
                    }

                    destination.EndWrite(iar);
                }
            }
        }
        catch (Exception e)
        {
            ar.Complete(e);
            callback?.Invoke(ar);
        }
    };

    readWriteLoop(null);

    return ar;
}

public void EndCopyStreamToStream(IAsyncResult asyncResult)
{
    if (asyncResult is not MyAsyncResult ar)
    {
        throw new ArgumentException(null, nameof(asyncResult));
    }

    ar.Wait();
}

private sealed class MyAsyncResult : IAsyncResult
{
    private bool _completed;
    private int _completedSynchronously;
    private ManualResetEvent? _event;
    private Exception? _error;

    public MyAsyncResult(object? state) => AsyncState = state;

    public object? AsyncState { get; }

    public void Complete(Exception? error)
    {
        lock (this)
        {
            _completed = true;
            _error = error;
            _event?.Set();
        }
    }

    public void Wait()
    {
        WaitHandle? h = null;
        lock (this)
        {
            if (_completed)
            {
                if (_error is not null)
                {
                    throw _error;
                }
                return;
            }

            h = _event ??= new ManualResetEvent(false);
        }

        h.WaitOne();
        if (_error is not null)
        {
            throw _error;
        }
    }

    public WaitHandle AsyncWaitHandle
    {
        get
        {
            lock (this)
            {
                return _event ??= new ManualResetEvent(_completed);
            }
        }
    }

    public bool CompletedSynchronously
    {
        get
        {
            lock (this)
            {
                if (_completedSynchronously == 0)
                {
                    _completedSynchronously = _completed ? 1 : -1;
                }

                return _completedSynchronously == 1;
            }
        }
    }

    public bool IsCompleted
    {
        get
        {
            lock (this)
            {
                return _completed;
            }
        }
    }
}

Черт побери. И, даже несмотря на всю эту тарабарщину, это все равно не лучшая реализация. Например, реализация IAsyncResult блокирует каждую операцию вместо того, чтобы делать все более безблокировочным способом, где это возможно, Exception хранится в сыром виде, а не в виде ExceptionDispatchInfo, что позволило бы расширить стек вызовов при распространении, много выделения для каждой отдельной операции (например, делегат выделяется для каждого вызова BeginWrite), и так далее.

Теперь представьте, что все это нужно делать для каждого метода, который вы хотите написать. Каждый раз, когда вы захотите написать переиспользуемый метод, который будет потреблять другую асинхронную операцию, вам придется проделывать всю эту работу. А если вы хотите написать переиспользуемые комбинаторы, которые могли бы эффективно работать с несколькими дискретными IAsyncResults (вспомните Task.WhenAll), то это еще один уровень сложности; каждая операция реализует и раскрывает свои собственные API, специфичные для этой операции, что означает отсутствие общего языка для того, чтобы говорить о них одинаково (хотя некоторые разработчики писали библиотеки, которые пытались немного облегчить это бремя, обычно через еще один уровень обратных вызовов, который позволял API предоставлять соответствующий AsyncCallback методу Begin).

И все эти сложности привели к тому, что очень немногие даже пытались это сделать, а у тех, кто это делал, были сплошные ошибки. Честно говоря, это не критика паттерна APM. Скорее, это критика асинхронности на основе обратных вызовов в целом. Мы все так привыкли к мощи и простоте, которые обеспечивают нам конструкции потока управления в современных языках, и подходы, основанные на обратных вызовах, обычно нарушают эти конструкции, как только в них появляется хоть какое-то разумное количество сложности. Ни один другой основной язык не имеет лучшей альтернативы.

Нам нужен был лучший способ, в котором мы научились на примере паттерна APM, включив в него то, что он сделал правильно, и избежав его подводных камней. Интересно отметить, что паттерн APM - это всего лишь паттерн; среда выполнения, основные библиотеки и компилятор не предоставляют никакой помощи в использовании или реализации паттерна.

Tags:
Hubs:
Total votes 17: ↑16 and ↓1+16
Comments8

Articles