Пишем свой SynchronizationContext

Началась эта история довольно давно, когда я впервые попытался работать с UI не из UI-потока. И когда я начал ловить различные “глюки”, я понял, что делать это нужно осторожно. Позднее я столкнулся с этим в дотнет мире и именно в тот момент я впервые познакомился с SynchronizationContext. Но тогда, почитав про устройство этого объекта, я посчитал, что этих знаний мне достаточно. Сделать это можно, например, здесь: SynchronizationContext — когда MSDN подводит.

Вспомнил про SynchronizationContext я только с выходом c# 5 и его async/await, т.к. этот механизм взаимодействует как раз с этим самым контекстом синхронизации. Делается это для того, чтобы после асинхронной операции, код мог выполняться в вызывающем асинхронную операцию потоке, что очень удобно при работе с UI. Но запустив этот небольшой код в UI-потоке и любом другом:

Debug.WriteLine(Thread.CurrentThread.ManagedThreadId);
await Task.Run(() => Debug.WriteLine(Thread.CurrentThread.ManagedThreadId));
Debug.WriteLine(Thread.CurrentThread.ManagedThreadId);

Мы увидим, что код возвращается в исходный поток только при запуске в UI-потоке. Все дело в том, что контекст синхронизации задан только в UI-потоке (кроме wcf и т.д.). В голову сразу же приходит мысль, нужно просто задать контекст синхронизации нужному потоку. Но здесь нас ждет проблема, стандартная реализация SynchronizationContext не дает нам нужных возможностей. Она позволяет продолжать исполнять код в текущем потоке или в потоке из пула. После того, как я не нашел реализации, которую можно просто скопировать, запустить и увидеть желаемый результат, я решил попробовать реализовать свою и представить, как бы оно могла выглядеть на деле. Об этом и пойдет речь ниже.

Для выполнения кода в SynchronizationContext предусмотрены два виртуальных метода Send (синхронное выполнение) и Post (асинхронное). Поэтому наследуемся от SynchronizationContext и переопределяем нужные методы.

CustomSynchronizationContext
class CustomSynchronizationContext : SynchronizationContext, IDisposable
{
    private readonly AutoResetEvent _eventReset;
    private readonly Queue<KeyValuePair<SendOrPostCallback, object>> _workItems;
    private readonly Thread _thread;

    public CustomSynchronizationContext()
    {
        _eventReset = new AutoResetEvent(false);
        _workItems = new Queue<KeyValuePair<SendOrPostCallback, object>>();
        _thread = new Thread(DoWork);
        _thread.Start(this);
    }

    private void DoWork(object obj)
    {
        SynchronizationContext.SetSynchronizationContext(obj as SynchronizationContext);

        while (true)
        {
            while (_workItems.Count > 0)
            {
                var item = _workItems.Dequeue();
                item.Key(item.Value);
            }

            _eventReset.Reset();
            _eventReset.WaitOne();
        }
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        _workItems.Enqueue(new KeyValuePair<SendOrPostCallback, object>(d, state));
        _eventReset.Set();
    }

    public void Dispose()
    {
        _eventReset.Dispose();
        _thread.Abort();
    }
}


Запускаем.

static void Main(string[] args)
{
    var syncContext = new CustomSynchronizationContext();
    Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
    syncContext.Post(o => Console.WriteLine(Thread.CurrentThread.ManagedThreadId), null);
}

И ожидаемо видим разные потоки. Что здесь происходит? Во-первых, для удобства, создаем и присваиваем поток внутри контекста, а не контекст потоку. Так мы будем уверены, что никто кроме нас не сможет влиять на этот поток. Во-вторых, заводим очередь, в которой будем хранить делегаты для выполнения в созданном потоке. В-третьих, “прикостыливаем” AutoResetEvent, чтобы поток не завершался и не зацикливался без дела. Ну и IDisposable. Обратите внимание, что при удалении контекста, здесь же будет попытка ликвидировать поток. Т.е. такой код:

static void Main(string[] args)
{
    using (var syncContext = new CustomSynchronizationContext())
    {
        Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
        syncContext.Post(o => Console.WriteLine(Thread.CurrentThread.ManagedThreadId), null);
    }
}

скорее всего выведет информацию только об изначальном потоке. Возможно это не то, что хотелось бы, но для демонстрационного примера, думаю, сойдет. К тому же это легко исправить.

Что с обработкой исключений? Проверим.

static void Main(string[] args)
{
    var syncContext = new CustomSynchronizationContext();
    try
    {
        syncContext.Post(o => { throw new Exception("TestException"); }, null);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
}

Ожидаемо падает в нашем “особенном” потоке. Самое время вспомнить, что у нас есть еще и метод Send, который отвечает за синхронное выполнение. Это должно позволить дождаться завершения выполнения делегата и получить исключение. Попробуем.

CustomSynchronizationContext (финальная демонстрационная версия)
class CustomSynchronizationContext : SynchronizationContext, IDisposable
{
    private readonly AutoResetEvent _workerResetEvent;
    private readonly ConcurrentQueue<WorkItem> _workItems;
    private readonly Thread _thread;

    public CustomSynchronizationContext()
    {
        _workerResetEvent = new AutoResetEvent(false);
        _workItems = new ConcurrentQueue<WorkItem>();
        _thread = new Thread(DoWork);
        _thread.Start(this);
    }

    private void DoWork(object obj)
    {
        SynchronizationContext.SetSynchronizationContext(obj as SynchronizationContext);

        while (true)
        {
            WorkItem workItem;
            while (_workItems.TryDequeue(out workItem))
                workItem.Execute();
           
            //Note: race condition here
            _workerResetEvent.Reset();
            _workerResetEvent.WaitOne();
        }
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        if (Thread.CurrentThread == _thread)
            d(state);
        else
        {
            using (var resetEvent = new AutoResetEvent(false))
            {
                var wiExecutionInfo = new WorkItemExecutionInfo();
                _workItems.Enqueue(new SynchronousWorkItem(d, state, resetEvent, ref wiExecutionInfo));
                _workerResetEvent.Set();

                resetEvent.WaitOne();
                if (wiExecutionInfo.HasException)
                    throw wiExecutionInfo.Exception;
            }
        }
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        _workItems.Enqueue(new AsynchronousWorkItem(d, state));
        _workerResetEvent.Set();
    }

    public void Dispose()
    {
        _workerResetEvent.Dispose();
        _thread.Abort();
    }

    private class WorkItemExecutionInfo
    {
        public bool HasException => Exception != null;
        public Exception Exception { get; set; }
    }

    private abstract class WorkItem
    {
        protected readonly SendOrPostCallback SendOrPostCallback;
        protected readonly object State;

        protected WorkItem(SendOrPostCallback sendOrPostCallback, object state)
        {
            SendOrPostCallback = sendOrPostCallback;
            State = state;
        }

        public abstract void Execute();
    }

    private class SynchronousWorkItem : WorkItem
    {
        private readonly AutoResetEvent _syncObject;
        private readonly WorkItemExecutionInfo _workItemExecutionInfo;

        public SynchronousWorkItem(SendOrPostCallback sendOrPostCallback, object state, AutoResetEvent resetEvent,
            ref WorkItemExecutionInfo workItemExecutionInfo) : base(sendOrPostCallback, state)
        {
            if (workItemExecutionInfo == null)
                throw new NullReferenceException(nameof(workItemExecutionInfo));

            _syncObject = resetEvent;
            _workItemExecutionInfo = workItemExecutionInfo;
        }

        public override void Execute()
        {
            try
            {
                SendOrPostCallback(State);
            }
            catch (Exception ex)
            {
                _workItemExecutionInfo.Exception = ex;
            }
            _syncObject.Set();
        }
    }

    private class AsynchronousWorkItem : WorkItem
    {
        public AsynchronousWorkItem(SendOrPostCallback sendOrPostCallback, object state)
            : base(sendOrPostCallback, state)
        {
        }

        public override void Execute()
        {
            SendOrPostCallback(State);
        }
    }
}


Здесь для удобства вводим класс WorkItem, который будет выполнять код (делегат) нужным нам способом. От него наследуем еще два SynchronousWorkItem и AsynchronousWorkItem, по названиям понятно в чем их различие. В реализациях отличие только в том, что в синхронной версии реализовано ожидание (AutoResetEvent) и поглощение исключения, которое далее будет брошено в изначальном потоке. Теперь KeyValuePair<SendOrPostCallback, object> можно поменять на WorkItem, ну и поменяем простую очередь на конкурентную. Также в методе Send добавляем проверку текущего потока и если он вдруг окажется “нашим”, то просто запустим делегат здесь же.

Снова проверяем.

static void Main(string[] args)
{
    var syncContext = new CustomSynchronizationContext();
    try
    {
        syncContext.Send(o => { throw new Exception("TestException"); }, null);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
}

Теперь исключение успешно обработано. Ну и пришло время запустить самый первый пример кода, который упоминался в статье, на потоке с только что созданным контекстом синхронизации.

static void Main(string[] args)
{
    var syncContext = new CustomSynchronizationContext();
    syncContext.Post(TestAsyncMethod, null);
}

async static void TestAsyncMethod(object obj)
{
    Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
    await Task.Run(() => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));
    Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
}

Мой вывод:

9
10
9
  • +17
  • 12.9k
  • 7
Share post

Similar posts

Comments 7

      +2
      Скажите, а зачем вам возвращатся в тот поток из которого вызывали, если это не UI поток?
        +2
        Ну разве что кто-то запустит непотокобезопасный код. Мне просто было интересно все это воспроизвести, практической пользы от этого мало.
          +1
          Чтобы await в консольном приложении работал, как ожидается, например.
            +2
            Я сталкивался со следующей проблемой, когда надо было сделать несколько асинхронных вызовов к базе с помощью EF в ASP.NET приложении.

            Суть проблемы в том, что контекст EF не является потокобезопасным, и более того он генерирует исключение, если обращение к контексту было создано из другого потока. В тоже время асинхронность вызова в ASP.NET подразумевает использование AspNetSynchronizationContext, который в свою очередь использует пул потоков, а это значит, что вызовы к контексту EF могут происходить из разных потоков. Поэтому в такой ситуации собственный SynchronizationContext не повредил бы.
            +1
            Мне кажется в предложенном решении есть race condition. Если новая задача прилетит между выполнением двух строк, то прилетевшая задача не выполнится пока не придёт следующая задача:

                        _eventReset.Reset();
                        _eventReset.WaitOne();
            
              0
              Да есть, только на строчку выше, я не зря описал это «прикостыливанием»… Думаю для демо сойдет.

            Only users with full accounts can post comments. Log in, please.