Событийная модель на основе async и await

В далеком 2012, когда цена на нефть еще была трехзначной, а трава зеленее, майкрософтом был выпущен .NET 4.5, а с ним и конструкция async/await. Про неё написано уже довольно много статей (Async в C#), а большинство разработчиков C# хорошо её изучили. Но все ли варианты использования были рассмотрены, можно ли выжать из await немного больше?

Самым очевидным вариантом использованием этой конструкции является ожидание завершения некой асинхронной операции. Первое, что приходит на ум — это ожидание ввода-вывода. Например, мы послали запрос клиенту и ожидаем ответа, тогда используя await мы сможем продолжить выполнение кода после получения ответа, а сам код при этом будет выглядеть синхронным. Но что если во время ожидания возникнет необходимость прервать выполнение этой операции? Тогда нам придется использовать CancellationToken, причем если таких операций несколько, то токены необходимо будет линковать или использовать один общий токен. При этом причина отмены будет скрыта от кода, использующего этот CancellationToken. Кроме отмены, код должен поддерживать обработку потери соединения, таймаута, возвращаемых ошибок и т.д.

В классическом варианте это выльется в использование CancellationToken для обработки отмены, try catch для обработки разрыва соединения и код анализа возвращенных данных, для оценки результата запроса. Но можно ли уместить всё это в единой парадигме? В этот статье я предлагаю рассмотреть альтернативный подход, основанный на событийной модели с использованием синтаксического сахара async/await.

Библиотека Eventing.


Всё необходимое для событийной модели на async/await было оформлено в виде библиотеки Eventing и выложено на GitHub под лицензией MIT.

Библиотека протестирована и успешно используется на боевой системе более двух лет.

Использование


Описанный в начале пример с использованием Eventing будет выглядеть так:

var @event = await this.EventManager.WaitFor<MessageReceived, CancelRequested>(TimeSpan.FromSeconds(50));

if (@event == null)
    Log.Info("timeout");
else if (@event is CancelRequested)
    Log.Info("Cancelled, reason: {0}", ((CancelRequested) @event).Reason);
else
    Log.Info("Message received");

Здесь мы используем EventManager — менеджер событий реализующий интерфейс IEventManager, для ожидания событий MessageReceived и CancelRequested с таймаутом в 50 секунд. С помощью вызова WaitFor мы формируем подписку на указанные события, а вызов await блокирует дальнейшее исполнение кода(но не потока). Оно останется заблокированным до тех пор, пока не произойдет одно из указанных событий или истечет время таймаута, после чего выполнение продолжится в текущем контексте синхронизации. Но что если связь с клиентом будет потеряна во время формирования подписки? В этом случае код зависнет на 50 секунд, так как событие отключения клиента будет упущено. Исправим это:

// Создание подписки
var eventAwait = this.EventManager.WaitFor<MessageReceived, ClientDisconnected, CancelRequested>(TimeSpan.FromSeconds(50), 
            e => !(e is ClientDisconnected) || ((ClientDisconnected)e).id == client.Id); // Фильтр события

if (!client.Connected || cancelRequested) {
    // Случай отключения клиента или запроса на отмену во время создания подписки
    Log.Info("Client disconnected or cancel requested");
    return;
}

 //  Прерывание кода до наступления события
 var @event = await eventAwait;
 ...

Здесь мы добавили событие ClientDisconnected и разделили создание awaitable переменной eventAwait и непосредственно ожидание события. Если бы мы не разделили их, то клиент мог бы отключиться после проверки client.Connected и ожиданием события, что привело бы к потери события. Также был добавлен фильтр событий, который исключает события ClientDisconnected не относящиеся к текущему клиенту.

Как создать событие?


Для этого надо создать класс, имплементирующий IEvent:

class CancelRequested : IEvent {
    public string Reason { get; set; }
}

А затем вызвать IEventManager.RaiseEvent, например:

this.EventManager.RaiseEvent(new CancelRequested()). 


Наследование от IEvent отделяет события от остальных классов и предотвращает использование неподходящих экземпляров в методе RaiseEvent. Также поддерживается наследование:

class UserCancelRequested : CancelRequested {
}

class SystemCancelRequested : CancelRequested {
}

var @event = await this.EventManager.WaitFor<CancelRequested>();
if (@event is UserCancelRequested)
    ...

Если у вас сложная система в которой множество одновременно ожидаемых событий, использование события CancelRequested вместо токенов отмены, позволит избежать прокидывания и линкование глобального и локального CancellationToken. Это важно, так как сложное линкование повышает вероятность пропустить утечку памяти из-за удержания токенов.

Как подписаться на событие?


Некоторые события носят периодический характер, такие события можно получать методом IEventManager.StartReceiving:

void StartReceiving<T>(Action<T> handler, object listener, Func<T, bool> filter = null, SynchronizationContext context = null) 
                       where T : IEvent;

Обработчик handler будет вызван в контексте синхронизации context при каждом событии T, которое удовлетворяет фильтру filter, если он задан. Если контекст синхронизации не задан, то будет использован SynchronizationContext.Current.

Как это работает?


Используется всё тот-же механизм тасков, на котором основан async/await. При вызове WaitFor менеджер событий создает таск используя TaskCompletionSource и формирует подписку по выбранным типам событий в шине сообщений.

// EventManager.cs, создание подписки
var taskCompletionSource = new TaskCompletionSource<IEvent>();

var subscription = new MessageSubscription(
            subscriptionId,
            message => {
                var @event = message as IEvent;
                if (filter != null && !filter(@event))
                    return;

                // Устанавливаем результат исполнения задачи
                if (taskCompletionSource.TrySetResult(@event))
                    this.trace.TraceEvent(TraceEventType.Information, 0, "Wait ended: '{0}' - '{1}'",
                        subscriptionId, message.GetType());
            },
            this, UnsubscribePolicy.Auto, this.defaultContext, eventTypes);
            
this.messageBus.Subscribe(subscription);
...
return taskCompletionSource.Task;

При генерации события вызывается метод RaiseEvent, который передает событие в шину, а она в соответствии с типом события выбирает подписки, в которых eventTypes включает в себя этот тип. Далее вызывается обработчик подписки и если он удовлетворяет фильтру, то устанавливается результат исполнения задачи и разблокирует вызов await.

// EventManager.cs, генерация события
public void RaiseEvent(IEvent @event) {
    this.trace.TraceEvent(TraceEventType.Information, 0, "Event: {0}", @event);

    this.messageBus.Send(@event);
}

// MessageBus.cs, отправка сообщения
public void Send(object message) {
var messageType = message.GetType();
IMessageSubscription[] subscriptionsForMessage;

lock (this.subscriptions) {
    subscriptionsForMessage = this.subscriptions
        .Where(s => s.MessagesTypes.Any(type => messageType == type || type.IsAssignableFrom(messageType)))
        .ToArray();
}

...

foreach (var subscription in subscriptionsForMessage)
    subscription.ProccessMessage(message);

this.UnsubscribeAutoSubscriptions(subscriptionsForMessage);
...

// MessageSubscription.cs
public void ProccessMessage(object message) {
    var messageHandler = this.handler;
    this.SynchronizationContext.Post(o => messageHandler(message), null);
}

В MessageSubscription.ProccessMessage сообщение передается в заданный пользователем контекст синхронизации, что позволяет избежать задержок при отправке сообщения.

Избавь мой класс от многопоточности!


Каждый, кто работал с async/await знает, что после завершения await код продолжает свое исполнения не в текущем потоке, а в текущем контексте синхронизации. Это может быть проблемой, если вы подпишетесь на событие с помощью StartReceiving, а затем вызовите WaitFor, что приведет к тому, что код класса будет исполняться одновременно в разных потоках(обработчик событий из StartReceiving и код после await // как страшно жить!). Это легко исправить однопоточным контектстом синхронизации, входящим в библиотеку:

this.serverSynchronizationContext = new SingleThreadSynchronizationContext("Server thread");
this.clientSynchronizationContext = new SingleThreadSynchronizationContext("Client thread");

this.serverSynchronizationContext.Post(async o => await this.RunServer(), null);
this.clientSynchronizationContext.Post(async o => await this.RunClient(), null);

Таким образом у нас клиент всегда будет выполняться в потоке «Client thread», а сервер в «Server thread». Вы сможете писать многопоточный код не задумываясь о race condition. В качестве бонуса это позволит максимально утилизировать отдельно взятый поток.

В чем преимущество?


Главным преимуществом является простота и тестируемость кода. Если насчет первого можно спорить, простоту каждый понимает по своему, то со вторым пунктом всё очевидно. Многопоточное приложение можно протестировать в одном потоке, эмулируя любую последовательность событий, причем для этого не требуется создавать mock объекты, любое взаимодействие можно свести к событиям, а их проверку к вызову RaiseEvent. Пример NUnit:

/// <summary>
///     This test demonstrates how to test application that uses Eventing
///     All code executes sequently in one thread
/// </summary>
[TestFixture]
public class TestCase : TestBase {
    [Test]
    public async Task ClientDoesWork() {
        var client = new Client(this.EventManager);
        var doWorkAwaitable = client.DoWork();

        this.EventManager.RaiseEvent(new Connected());

        // We can debug and step into 
        this.React();

        await doWorkAwaitable;

        Assert.AreEqual(true, client.Connected);
    }
}

Как это можно использовать?


Чтобы не переполнять статью листингами, приведу лишь краткое текстовое описание одной из системы, где используется Eventing. Это горизонтально масштабируемая распределенная система, состоящая из четырех типов узлов, один из которых является мастером. Мастер непрерывно общается со всеми узлами и управляет выполнением на них различных операций. Каждую операцию можно представить в виде конечного автомата, где переход это наступление события(в том числе таймаут или отмена). Хотя для каждой операции и можно было автомат реализовать в его классическом виде(что мы изначально и сделали), намного проще оказалось представить его используя Eventing, где текущее состояние определялось точкой выполнения кода, а не отдельной переменной. При это на каждом шаге были явно перечислены все ожидаемые события, что упрощало тестирование белого ящика.

Заключение


В статье рассмотрены ключевые возможности и варианты использования библиотеки Eventing. Библиотека не претендует на универсальность и поддержку высоконагруженных систем, но призывает немного по другому взглянуть на привычные вещи, позволяет писать безопасный и легко тестируемый с точки зрения многопоточности код.
Share post
AdBlock has stolen the banner, but banners are not teeth — they will be back

More
Ads

Comments 18

    0
    Rx смотрели? Очень похожий функционал.
      0
      Смотрел, но не использовал. Тоже событийный подход, но области применение немного разные, Rx больше нацелен на фильтрацию и обработку событий отдельным обработчиком, а Eventing на последовательное исполнение кода. При этом в Rx используются генераторы, которые могут приводить к блокировке потока
        0
        В Rx есть, например, FirstAsync() для последовательного кода. Про генераторы и блокировки не понял. Чего там действительно не хватает — так это ограничений скорости поступления событий из источника данных из-за отсутствия поддержки OnNextAsync и т.п.
      0
      Есть одна неточность — «кто работал с async/await знает, что после завершения await код продолжает свое исполнения не в текущем потоке, а в текущем контексте синхронизации» — это не совсем так. Контекст синхронизации есть только у GUI-приложений, таких как WinForms или WPF, и после оператора await поток создается другой — подробнее тут alz-it.blogspot.ru/2016/06/c-awaitasync.html. Поэтому дабы избавиться от сложностей работы с этим контекстом — можно просто создать новый поток как например здесь alz-it.blogspot.ru/2016/06/await-winforms.html
        +2
        Контекст синхронизации есть только у GUI-приложений, таких как WinForms или WPF,

        Это неправда. встроенный контекст синхронизации есть в тех приложениях, где он оправдан (например, помимо перечисленных, он еще есть в asp.net). Но при этом можно создать и собственный контекст, если мы хотим получить какую-то его функциональность.


        после оператора await поток создается другой

        Это далеко не обязательно, и определяется конкретным используемым диспетчером.


        Поэтому дабы избавиться от сложностей работы с этим контекстом

        А там есть какие-то сложности? В большей части случаев он работает так, как и ожидается программистом. Поправки надо делать только в тех случаях, если вы нарушаете правило async all the way down.


        Ну и да, вы всегда можете отказаться от возврата в контекст синхронизации.

          0
          1.Ну скажем так — asp.net тоже в некотором роде GUI :)
          2. Вы можете привести конкретный пример когда при применении await не создается ожидающий callback поток? И вообще — как влияет диспетчера на работу await?
          3. Сложности работы с контекстом синхронизации состоят в том что его вообще приходиться использовать ( сама идея контекста синхронизации очень напоминает банальный «костыль»), подробнее тут http://alz-it.blogspot.ru/2016/06/executioncontext-synchronizationcontext.html
            +1
            Ну скажем так — asp.net тоже в некотором роде GUI

            Нет, asp.net — это веб-фреймворк. И, скажем, когда на нем пишут WebAPI, никам UI там вообще не пахнет.


            Вы можете привести конкретный пример когда при применении await не создается ожидающий callback поток?

            В такой формулировке — никогда, вся прелесть TPL в том, что никто никаких коллбэков не ожидает, это continuation-passing. Но даже если предположить, что вы, на самом деле, спрашиваете про потоки, в которых выполняются continuations, то вот вам простейший пример:


            private static async Task MainAsync()
                    {
                        WriteCurrentThread();
                        await ImplAsync();
                        WriteCurrentThread();
                    }
            
                    private static async Task ImplAsync()
                    {
                        WriteCurrentThread();
                    }

            Окей, пример надуманный (хотя, кстати, часто встречающийся в реальной жизни, именно поэтому в стейт-машине await под него есть оптимизация). Ладно, давайте его немного усложним:


                    private static async Task MainAsync()
                    {
                        WriteCurrentThread();
                        using (var r = new StreamReader(new FileStream("F:\\fp.mpcpl ", FileMode.Open, FileAccess.Read)))
                        {
                            await r.ReadToEndAsync();
                        }
                        WriteCurrentThread();
                    }

            Казалось бы, все, теперь мы всегда попадаем в другой поток… Но нет. Просто поменяем вызов c MainAsync().Wait() на AsyncContext.Run((Func<Task>) MainAsync) — и "волшебным" образом все continuations опять оказываются в одном потоке.


            И вообще — как влияет диспетчера на работу await?

            Извиняюсь, был не прав — не диспетчер, а шедулер.


            Сложности работы с контекстом синхронизации состоят в том что его вообще приходиться использовать

            Неа, в await это происходит прозрачно:


            //some code accessing HttpContext.Current
            await SomeInternalCode();
            //some other code accessing HttpContext.Current

            Вы явное использование контекста синхронизации видите? А тем не менее, контекст есть, и работает. Более того, если его убрать, у программиста просто не будет способа достать HttpContext.Current после await.

              –1
              1.
              «Нет, asp.net — это веб-фреймворк»
              скажем так, это вопрос терминологии :)

              2.
              «пишут WebAPI, никам UI там вообще не пахнет. „
              а кто сказал что asp.net — это UI :) Я сказал “вроде как» — и вот что я имел в виду — asp.net имеет пул, и в этом он сходен с UI — у которого есть очередь обрабатывающая события окна. И именно поэтому он имеет тот же механизм унификации возврата из асинхронного вызова.

              3. private static async Task MainAsync()
              {
              // здесь будет один поток ( скорее всего вызывающий)
              WriteCurrentThread();
              await ImplAsync();
              WriteCurrentThread();// а вот здесь уже будет другой
              }


              Вы явное использование контекста синхронизации видите? А тем не менее, контекст есть
              конечно есть, только тут используется ExecutionContext, внутри которого и будет искомый HttpContext.Current. Но это не есть использование контекста синхронизации ( хотя он безусловно есть — так уж asp.net написан). Здесь четко описана разница http://alz-it.blogspot.ru/2016/06/executioncontext-synchronizationcontext.html
                +1
                скажем так, это вопрос терминологии

                Терминология — штука специально весьма конкретная.


                а кто сказал что asp.net — это UI

                Вы: "asp.net тоже в некотором роде GUI".


                вот что я имел в виду — asp.net имеет пул, и в этом он сходен с UI — у которого есть очередь обрабатывающая события окна

                Намешали. Что значит, "asp.net имеет пул"? Если вы о том, что каждый приходящий запрос попадает в поток из определенного пула, то это свойство хоста, а не asp.net. А уж сходства между очередью событий и пулом нет вовсе — очередь событий в Windows Forms/WPF однопоточна (собственно, она для того и сделана, чтобы не заниматься синхронизацией на интерфейсных элементах), а обработчик http-запросов в asp.net в хорошем случае может быть вообще неблокирующим (если потоков хватает и не используется сессия).


                И именно поэтому он имеет тот же механизм унификации возврата из асинхронного вызова.

                "Механизм унификации возврата" имеет TPL, и этот механизм называется SynchronizationContext. Вопрос только в том, какие хосты (и почему) по умолчанию запускают код в таком контексте, а какие — нет.


                // здесь будет один поток ( скорее всего вызывающий)
                WriteCurrentThread();
                await ImplAsync();
                // а вот здесь уже будет другой
                WriteCurrentThread();

                Эксперимент с вами не согласен:


                static void Main(string[] args)
                {
                  MainAsync().Wait();
                }
                
                private static async Task MainAsync()
                {
                  WriteCurrentThread("MainAsync1");
                  await ImplAsync();
                  WriteCurrentThread("MainAsync2");
                }
                
                private static async Task ImplAsync()
                {
                  WriteCurrentThread("ImplAsync");
                }

                Вывод:


                MainAsync1: 1
                ImplAsync: 1
                MainAsync2: 1

                конечно есть, только тут используется ExecutionContext, внутри которого и будет искомый HttpContext.Current. Но это не есть использование контекста синхронизации

                Правда? А почему же тогда в методе SetContinuationForAwait — а именно он в итоге отвечает за то, как будет вызван continuation после await — в первую очередь используется SynchronizationContext.CurrentNoFlow?


                Здесь четко описана разница http://alz-it.blogspot.ru/2016/06/executioncontext-synchronizationcontext.html

                Вы, когда переводите, пусть и с сокращениями, чужую статью — хоть бы на первоисточник ссылку давали. В оригинале явно написано:


                When you await a task, by default the awaiter will capture the current SynchronizationContext, and if there was one, when the task completes it’ll Post the supplied continuation delegate back to that context, rather than running the delegate on whatever thread the task completed or rather than scheduling it to run on the ThreadPool.
                  –3
                  Вы, когда переводите, пусть и с сокращениями, чужую статью — хоть бы на первоисточник ссылку давали

                  1. это не единственный источник для моей статьи http://alz-it.blogspot.ru/2016/06/executioncontext-synchronizationcontext.html
                  2. если уж цитируете статью — то цитируйте хотя бы логически полностью, ибо абзацем выше сказано

                  The delegate that gets passed to the awaiter has a reference to this ExecutionContext instance and will use it when resuming the method. This is what enables the important “ambient” information represented by ExecutionContext to flow across awaits.

                  The Framework also has support for SynchronizationContext.
                  и далее и идет ваша цитата в том числе — и смысл как бы немного меняется :)
                  Эксперимент с вами не согласен

                  3. про точки прерывания мне даже неудобно как-то говорить…
                  Намешали.

                  4. в подобном тоне «поучите свою бабушку щи варить» — за сим откланиваюсь…
                    +1
                    это не единственный источник для моей статьи http://alz-it.blogspot.ru/2016/06/executioncontext-synchronizationcontext.html

                    Но один из основных (вплоть до дословного цитирования примеров кода).


                    если уж цитируете статью — то цитируйте хотя бы логически полностью, ибо абзацем выше сказано [...] и смысл как бы немного меняется

                    Нет, не меняется. Когда awaiter размещает continuation, он смотрит на SynchronizationContext напрямую, минуя ExecutionContext. Более того, когда ExecutionContext передается через await, SynchronizationContext не захыватывается.


                    Поймите, ExecutionContext и SynchronizationContext в контексте (простите) await — ортогональны. ExecutionContext отвечает за то, какие данные будут видны коду после await, а SynchronizationContext — за то, где и когда код после await будет выполнен.


                    про точки прерывания мне даже неудобно как-то говорить…

                    А при чем тут "точки прерывания"?

          0
          Это не совсем так. Контекст синхронизации есть только у GUI-приложений

          Ограничений на использование нет, в статье приводится пример с NUnit тестом. Если быть точным, выбор идет в таком порядке(MSDN):


          1. SynchronizationContext
          2. TaskScheduler
          3. ThreadPool
            0
            выбор идет в таком порядке(MSDN):
            вы не внимательно прочитали MSDN, там абзацем выше написано что такое поведение присуще TaskAwaiter, который появился только в NET.Framework 4.5! И далее написано именно то что я говорил, если SynchronizationContext.Current==null, то проверяем уж не задача ли это, и если нет — идем в стандартную обработку для потока (которые по умолчанию обычно живут в CLR ThreadPool).
            Ограничений на использование нет
            да какое ограничение, если SynchronizationContext.Current==null — то поневоле ничего с ним сделать нельзя :)
              0
              Именно это я и имел ввиду, что проверяется в том порядке, если null, то переходим к следующему. Конечно 4.5, ведь async с ним и добавили.
          0
          Вы можете получить тот же функционал, воспользовавшись акторной моделью: akka.net, orleans. В чем ваше преимущество?
            0
            Мне кажется некорректно сравнивать фреймворк и небольшую библиотеку.
            0
            но призывает немного по другому взглянуть на привычные вещи, позволяет писать безопасный и легко тестируемый с точки зрения многопоточности код.


            Вы где-то кроме программирования State Machine пытались применить это решение? Насколько я понял вы отказались от использования асинхронного кода в связке с .NET ThreadPool — в пользу создания создания отдельного потока на каждый контекст и последовательного выполнения задач в нем. Для кастомизации и тюнинга таких низкоуровневых решений требуеться намного больше знания многопоточности, чем для обычной синхронизации кода с использованием async\await, и которые не требуют девелопера считать сколько и когда им создавать потоков на приложение\event bus\операцию.
            • UFO just landed and posted this here

              Only users with full accounts can post comments. Log in, please.