Как стать автором
Обновить

Комментарии 38

Вижу следующую проблему в реализации: всё резко перестанет работать, если управление перейдёт в другой поток, например, если вы сделаете await Task.Delay(10).


Самое правильное решение, на мой взгляд — это написать кастомный однопоточный планировщик (SynchronizationContext). Занимает около 30 строчек. После чего можно спокойно использовать Task.Yield, а не изобретать велосипед.

Я бы не стал об этом писать, если бы сам не попробовал. У меня получается в итоге так:


Work A: 1, Thread: 1
Work B: 1, Thread: 1
Work C: 1, Thread: 1
Work D: 1, Thread: 1

Надо добавить в конец Thread.Sleep(...) поскольку ваш Delay никто не ждет и программа завершается раньше.
У меня вышло так:


Заголовок спойлера
Work A: 1, Thread: 1
Work B: 1, Thread: 1
Work C: 1, Thread: 1
Work D: 1, Thread: 1
Work D is completed, Thread: 4
Work B: 1 (Extra), Thread: 5
Work C: 2, Thread: 7
Work A: 2, Thread: 6
Work B: 2, Thread: 5
Work A: 3, Thread: 4
Work B: 2 (Extra), Thread: 10
Work B: 3, Thread: 10
Work C is completed, Thread: 11
Work A: 4, Thread: 11
Work B: 3 (Extra), Thread: 10
Work B is completed, Thread: 10
Work A is completed, Thread: 10

А надо бы ждать, а не придумывать костыль в виде Thread.Sleep. Короче, вот по-быстрому написал на коленке:


CooperativeContext.cs
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace CooperativeMultitasking
{
    public sealed class CooperativeContext : SynchronizationContext
    {
        ConcurrentQueue<Action> queue = new ConcurrentQueue<Action>();
        volatile int taskCount;
        AutoResetEvent evt = new AutoResetEvent(true);

        public void Post(Action action)
        {
            queue.Enqueue(action);

            if (!ReferenceEquals(Current, this))
                evt.Set();            
        }

        public void PostTask(Func<Task> func)
        {
            Interlocked.Increment(ref taskCount);

            Post(async () =>
            {
                await func();

                Interlocked.Decrement(ref taskCount);
            });
        }

        public override void Post(SendOrPostCallback d, object? state)
        {
            Post(() => d(state));
        }

        public void Run()
        {
            var oldContext = Current;

            try
            {
                SetSynchronizationContext(this);

                while (true)
                {
                    if (queue.TryDequeue(out var action))
                    {
                        action();
                    }
                    else if (taskCount == 0)
                    {
                        break;
                    }
                    else
                    {
                        evt.WaitOne();
                    }
                }
            }
            finally
            {
                SetSynchronizationContext(oldContext);
            }
        }
    }
}

Program.cs
using System;
using System.Threading;
using System.Threading.Tasks;

namespace CooperativeMultitasking
{
    class Program
    {
        static void Main()
        {
            var context = new CooperativeContext();
            context.PostTask(() => DoWork("A", 4));
            context.PostTask(() => DoWork("B", 4, extraWork: true));
            context.PostTask(() => DoWork("C", 2));
            context.PostTask(() => DoWork("D", 1));
            context.Run();
        }

        static async Task DoWork(string name, int delay, bool extraWork = false)
        {
            for (int i = 1; i <= delay; i++)
            {
                Console.WriteLine($"Work {name}: {i}, Thread: {Thread.CurrentThread.ManagedThreadId}");

                await Task.Delay(10);
                await Task.Yield();

                if (extraWork)
                {
                    Console.WriteLine($"Work {name}: {i} (Extra), Thread: {Thread.CurrentThread.ManagedThreadId}");
                    await Task.Yield();
                }
            }

            Console.WriteLine($"Work {name} is completed, Thread: {Thread.CurrentThread.ManagedThreadId}");
        }
    }
}

Результат
Work A: 1, Thread: 1
Work B: 1, Thread: 1
Work C: 1, Thread: 1
Work D: 1, Thread: 1
Work B: 1 (Extra), Thread: 1
Work B: 2, Thread: 1
Work A: 2, Thread: 1
Work D is completed, Thread: 1
Work C: 2, Thread: 1
Work B: 2 (Extra), Thread: 1
Work B: 3, Thread: 1
Work A: 3, Thread: 1
Work B: 3 (Extra), Thread: 1
Work C is completed, Thread: 1
Work B: 4, Thread: 1
Work A: 4, Thread: 1
Work B: 4 (Extra), Thread: 1
Work B is completed, Thread: 1
Work A is completed, Thread: 1

AutoReset WaitOne это обращение к ядру и перформанс тут сильно проседает. Мое решение вообще никак не связанно с многопоточкой и async/await используется просто как синтаксический сахар

AutoReset WaitOne это обращение к ядру и перформанс тут сильно проседает

Этот вызов произойдёт только в одном случае: если есть незавершённые фоновые задачи. В остальных случаях дело до него не дойдёт.


Если же это вам так мозолит глаза, тогда можете удалить весь связанный с AutoResetEvent код:


Будет примерно так
namespace CooperativeMultitasking
{
    public sealed class CooperativeContext : SynchronizationContext
    {
        Queue<Action> queue = new Queue<Action>();

        public void Post(Action action)
        {
            queue.Enqueue(action);    
        }

        public override void Post(SendOrPostCallback d, object? state)
        {
            Post(() => d(state));
        }

        public void Run()
        {
            var oldContext = Current;

            try
            {
                SetSynchronizationContext(this);

                while (queue.Count > 0)
                {
                    queue.Dequeue().Invoke();
                }
            }
            finally
            {
                SetSynchronizationContext(oldContext);
            }
        }
    }
}

Код становится совсем простым, не правда ли?

В целом да, соглашусь. Хоть это все и прибито гвоздями к таскам.

Хоть это все и прибито гвоздями к таскам.

Тоже не совсем так. SynchronizationContext к таскам вообще не имеет никакого отношения. Task.Yield, как это ни странно, тоже — вы можете вместо него свою реализацию подсунуть:


Пример
partial class Async
{
    public static YieldAwaitable Yield() => new(Scheduler);

    public readonly struct YieldAwaitable
    {
        readonly IAsyncScheduler scheduler;

        public YieldAwaitable(IAsyncScheduler scheduler)
        {
            this.scheduler = scheduler;
        }

        public Awaiter GetAwaiter() => new(scheduler);

        public readonly struct Awaiter
            : INotifyCompletion
        {
            readonly IAsyncScheduler scheduler;

            public Awaiter(IAsyncScheduler scheduler)
            {
                this.scheduler = scheduler;
            }

            public bool IsCompleted => false;

            public void OnCompleted(Action continuation) => scheduler.Post(continuation);

            public void GetResult()
            {
            }
        }
    }
}

Примечание: IAsyncScheduler — это обёртка над SynchronizationContext, чтобы не делать двойное преобразование Action → SendOrPostCallback → Action.


Прибитость гвоздями к таскам вылезает, когда вы используете синтаксический сахар async/await. Функция должна возвращать Task. Правда, потом в C# разрешили использовать и свои реализации, используя атрибут AsyncMethodBuilder: ValueTask — один из примеров (правда, под капотом там все равно Task сидит).

Прибитость гвоздями к таскам вылезает, когда вы используете синтаксический сахар async/await. Функция должна возвращать Task. Правда, потом в C# разрешили использовать и свои реализации, используя атрибут AsyncMethodBuilder: ValueTask — один из примеров (правда, под капотом там все равно Task сидит).

Эту статью я задумывал как логическое продолжение цикла про то, как можно использовать этот MetodBuilder. Вот например Монада «Maybe» через async/await в C# (без Task-oв!) или Writing “Lazy Task” Using New Features of C# 7 В данном случае Method Builder не пригодился, но акцент я хотел сделать именно на конструкциях языка C#, нежели на практической стороне вопроса.

НЛО прилетело и опубликовало эту надпись здесь
Она может возвращать вообще все что угодно у чего есть метод GetAwaiter

Нет. await-тить можно всё, что угодно, что имеет метод GetAwaiter. А вот сама асинхронная функция должна возвращать хитрый тип, который должен следовать определённым правилам.

НЛО прилетело и опубликовало эту надпись здесь

Тем не менее, кооперативная многозадачность через async/await реализуется проще всего.


потом ждать непонятно чего

Ждать, пока не отработают другие задачи в очереди.


Мне, правда, тоже не очень понятно, затем ждать исключительно вычислительный код, но оставим это на совести автора.

НЛО прилетело и опубликовало эту надпись здесь
У меня вышло так:

То есть у вас функции в итоге разбегаются по разным потокам? Забавно.
И где же в таком случае кооперативная многозадачность?

Изначально я вообще хотел в этом случае исключение кидать, поскольку не надо там Delay использовать, подход вообще не про это.

В реальной жизни предполагается, что continuation будет вызван, когда будет завершена асинхронная операция, но в нашем случае никаких асинхронных операций нет

А куда, собственно, делись все асинхронные операции?

Их там никогда и не было. Все работает синхронно.

Ну и зачем такая "многозадачность" нужна, когда любое чтение из файла или сокета останавливает все задачи?

Реальная вытесняющая многозадачность требует дополнительных усилили на синхронизацию доступа к общим ресурсам. В кооперативной этого не требуется.

Кооперативная многозадачность не исключает работу с файлами, сокетами и таймерами. Кооперативность означает лишь то, что не планировщик решает, когда надо прервать задачу ради другой задачи, а сама задача отдаёт управление.


Я в комментарии выше привёл пример полнофункционального планировщика. С ним вы можете спокойно использовать асинхронную работу с таймерами, сокетами. При этом все задачи будут работать поочерёдно в одном потоке.

НЛО прилетело и опубликовало эту надпись здесь
А зачем такие сложности, если в C# есть генераторы?
yield return не предназначен для решения подобных задач.
Если пытаться использовать IEnumerable для создания кооперативной многозадачности — получатся корутины из Unity.
НЛО прилетело и опубликовало эту надпись здесь
НЛО прилетело и опубликовало эту надпись здесь

Идея была в том, чтобы внести минимальные изменения в исходный код. Читабельность в вашем примере заметно пострадала, а ведь это самый простой случай. Попробуйте добавить еще пару циклов и условных операторов и тогда станет ясно, что async здорово выручает.


Но, все-таки, непонятно к чему тут вообще таски и async/await

Тасков здесь и нет, я намерено использовал ValueTask, который хоть и связан с Тасками, но может быть легко заменен своей реализаций (через Method Builder), и тогда зависимости от System.Threading.Tasks вообще не будет.


async/await преобразует исходный метод в виде конечного автомата и больше он ничего не делает.

НЛО прилетело и опубликовало эту надпись здесь

Я думаю, что смущение вызывает ключевое слово “async”, но никакой асинхронности здесь нет.
Эта конструкция была изначально введена для упрощения работы с тасками, но поскольку Таск я является, по сути, монадой, то async/await удивительным образом подошёл и для других монад, что я показывал в своих предыдущих статьях. Асинхронность — это только частный случай.

Таски это, как бы, асинхронность, а кооперативная многозадачность непонятно каким к ней вообще боком.

Вообще-то как раз понятно: кооперативная многозадачность отличается от вытесняющей наличием в коде явной передачи управления планировщику, и оператор await как раз такой передачей управления и занимается. Так что таски как раз являются формой кооперативной многозадачности (точнее, смешанной многозадачности, но чистую кооперативную тоже можно сделать при желании).

Если что-то пойдет не так, что увидим в стектрейсе? Портянку внутренних деталей реализации? Как сложно отлаживать async/await?

Это синхронный однопоточный код и все исключения ловятся в нем как и в любом другом синхронном однопоточном коде. (try/catch/finally). Стектрейсы будут не самые простые, но вполне анализируемые.

НЛО прилетело и опубликовало эту надпись здесь

Я помню это ощущение из питона, когда ничего не потокобезопасно, а тебе пофиг — из-за GIL все в один поток, и можно не думать о разделении ресурсов. Ну и добавление асинхронщины ничего не ломает.
В шарпе же можно ограничить тредпул одним потоком, или свой шедулер как выше предложили, необязательно хакать именно async/await, но прикольно, да

Вот только это автоматически ограничит производительность приложения, так что лучше всё-таки в один поток всё не засовывать.


когда ничего не потокобезопасно, а тебе пофиг — из-за GIL все в один поток, и можно не думать о разделении ресурсов

А вот это, кстати, неправда: GIL далеко не от всех проблем спасает.

Производительность разная бывает, от задачи зависит. А от чего gil не спасет? Ну кроме ситуаций когда из питона запускают модуль на каком-нибудь c++, который создает отдельный тред и что-нибудь шатает в общей памяти?

Не совсем я понял проблему, Task.Yield необязательно переключает потоки, зависит от того есть или нет SynchronizationContext и какой TaskScheduler используется. Можно даже не писать своего, а использовать готовый SynchronizationContext из UI фреймворка, он будет все выполнять на одном треде.

Зарегистрируйтесь на Хабре, чтобы оставить комментарий

Публикации

Истории