Pull to refresh

Реализуем кооперативную многозадачность на C#

.NETC#


Когда речь заходит о многозадачности в .Net, то в подавляющем большинстве случаев предполагается вытесняющая многозадачность на основе потоков операционной системы. Но в этой статье речь пойдёт о реализации кооперативной многозадачности, с помощью которой можно создать видимость одновременной работы нескольких методов, используя всего один единственный поток.


Вот наша простенькая заготовка:


static void Main()
{
    DoWork("A", 4);
    DoWork("B", 3);
    DoWork("C", 2);
    DoWork("D", 1);
}

static void DoWork(string name, int num)
{
    for (int i = 1; i <= num; i++)
    {
        Console.WriteLine($"Work {name}: {i}");
    }
    Console.WriteLine($"Work {name} is completed");
}

Статический метод вызывается последовательно 4 раза и выводит символы A, B, C, D заданное количество раз. Символы, очевидно, также выводятся последовательно, и наша задача здесь добиться того, что бы они выводились попеременно, но при этом сильно не меняя исходный код и не используя дополнительные потоки.


Заголовок этой статьи как бы намекает, что тут нужно реализовать кооперативную многозадачность, в которой исполняемый код время от времени сообщает, что: “Я пока работу приостанавливаю, и кто-нибудь еще может воспользоваться текущем потоком, чтобы сделать свои дела, но я надеюсь, что рано или поздно, мне это поток вернут обратно, ведь у меня еще куча дел”.


Вопрос: "Какая конструкция C# позволяет прервать работу метода на какое-то заранее неизвестное время?" Правильно! await! Давайте сделаем наш метод асинхронным и добавим await после вывода в консоль очередного символа:


static async ValueTask DoWork(string name, int num)
{
    for (int i = 1; i <= num; i++)
    {
        Console.WriteLine($"Work {name}: {i}");
        await /*Something*/
    }
    Console.WriteLine($"Work {name} is completed");
}

Этот await обозначает, что метод решил прерваться, чтобы другие вызовы тоже смогли вывести свои символы. Но что именно этот метод будет await-ить? Task.Delay(), Task.Yield() не подходят, так как они подразумевают переключение на другие потоки. Тогда создадим свой класс, который можно использовать с await, и который не будет иметь ничего общего с многопоточкой. Назовем его CooperativeBroker:


private class CooperativeBroker : ICooperativeBroker
{
    private Action? _continuation;

    public void GetResult() 
        => this._continuation = null;

    public bool IsCompleted 
        => false;//Preventing sync completion in async method state machine

    public void OnCompleted(Action continuation)
    {
        this._continuation = continuation;
        this.InvokeContinuation();
    }

    public ICooperativeBroker GetAwaiter() 
        => this;

    public void InvokeContinuation() 
        => this._continuation?.Invoke();
}

Компилятор C# преобразует исходный код асинхронных методов в виде конечного автомата, каждое состояние которого соответствует вызову await внутри этого метода. Код перехода в следующее состояние передается в виде делегата continuation в метод OnCompleted. В реальной жизни предполагается, что continuation будет вызван, когда будет завершена асинхронная операция, но в нашем случае никаких асинхронных операций нет и для работы программы надо бы было вызвать это continuation немедленно, но тогда методы опять будут работать последовательно, а мы этого не хотим. Лучше сохраним этот делегат на будущее и дадим поработать другим вызовам. Чтобы было где хранить делегаты давайте добавим класс CooperativeContext:


private class CooperativeBroker
{
    private readonly CooperativeContext _cooperativeContext;

    private Action? _continuation;

    public CooperativeBroker(CooperativeContext cooperativeContext)
        => this._cooperativeContext = cooperativeContext;

    ...

    public void OnCompleted(Action continuation)
    {
        this._continuation = continuation;
        this._cooperativeContext.OnCompleted(this);
    }

}

public class CooperativeContext
{
    private readonly List<CooperativeBroker> _brokers = 
        new List<CooperativeBroker>();

    void OnCompleted(CooperativeBroker broker)
    {
        ...
    }
}

где метод OnCompleted собственно и будет отвечать за поочередный вызов методов:


private void OnCompleted(CooperativeBroker broker)
{
    //Пропускает вызовы делегатов пока все брокеры не добавлены.
    if (this._targetBrokersCount == this._brokers.Count)
    {
        var nextIndex = this._brokers.IndexOf(broker) + 1;
        if (nextIndex == this._brokers.Count)
        {
            nextIndex = 0;
        }

        this._brokers[nextIndex].InvokeContinuation();
    }
}

Обратите внимание на первое условие – делегаты не вызываются до тех пор, пока все брокеры для всех кооперативных вызовов не будут добавлены в контекст (_targetBrokersCount — количество кооперативных вызовов). Если начать вызывать их сразу, то вызовы опять пойдут последовательно, так как наш контекст не сможет получить "продолжение" всех вызовов сразу.


Так как нам нужно заранее знать общее количество кооперативных вызовов, то перепишем эти вызовы следующим образом:


static void Main()
{
    CooperativeContext.Run(
        b => DoWork(b, "A", 4),
        b => DoWork(b, "B", 3),
        b => DoWork(b, "C", 2),
        b => DoWork(b, "D", 1)
    );
}

static async ValueTask DoWork(CooperativeBroker broker, string name, int num, bool extraWork = false)
{
    for (int i = 1; i <= num; i++)
    {
        Console.WriteLine($"Work {name}: {i}, Thread: {Thread.CurrentThread.ManagedThreadId}");
        await broker;
    }

    Console.WriteLine($"Work {name} is completed, Thread: {Thread.CurrentThread.ManagedThreadId}");
}

public class CooperativeContext
{
    public static void Run(params Func<CooperativeBroker, ValueTask>[] tasks)
    {
        CooperativeContext context = new CooperativeContext(tasks.Length);
        foreach (var task in tasks)
        {
            task(context.CreateBroker());
        }

        ...
    }

    ...

    private int _targetBrokersCount;

    private CooperativeContext(int maxCooperation)
    {
        this._threadId = Thread.CurrentThread.ManagedThreadId;
        this._targetBrokersCount = maxCooperation;
    }

    ...
}

Вроде бы можно уже и запускать, чтобы проверить как это работает, но осталась еще одна маленькая деталь – если один из вызовов завершается раньше других, то он перестает вызывать метод OnCompleted и вся наша цепочка прерывается. Что бы исправить эту ситуацию форсируем вызов оставшихся "продолжений", и заодно удалим ненужный уже брокер:


public class CooperativeContext
{
    public static void Run(params Func<ICooperativeBroker, ValueTask>[] tasks)
    {
        CooperativeContext context = new CooperativeContext(tasks.Length);
        foreach (var task in tasks)
        {
            task(context.CreateBroker());
        }

        // Программа приходит сюда когда один из методов завершен, но надо
        // закончить и остальные
        while (context._brokers.Count > 0)
        {
            context.ReleaseFirstFinishedBrokerAndInvokeNext();
        }
    }

    ...
    private void ReleaseFirstFinishedBrokerAndInvokeNext()
    {
        // IsNoAction означает что асинхронный метод завершен
        var completedBroker = this._brokers.Find(i => i.IsNoAction)!;

        var index = this._brokers.IndexOf(completedBroker);
        this._brokers.RemoveAt(index);
        this._targetBrokersCount--;

        if (index == this._brokers.Count)
        {
            index = 0;
        }

        if (this._brokers.Count > 0)
        {
            this._brokers[index].InvokeContinuation();
        }
    }    
}

private class CooperativeBroker : ICooperativeBroker
{
    ...
    public bool IsNoAction
        => this._continuation == null;
    ...
}

Вот теперь можно и запускать (чуть усложним нам задачу введя дополнительную работу):


static void Main()
{
    CooperativeContext.Run(
        b => DoWork(b, "A", 4),
        b => DoWork(b, "B", 3, extraWork: true),
        b => DoWork(b, "C", 2),
        b => DoWork(b, "D", 1)
    );
}

static async ValueTask DoWork(
    ICooperativeBroker broker, 
    string name, 
    int num, 
    bool extraWork = false)
{
    for (int i = 1; i <= num; i++)
    {
        Console.WriteLine(
               $"Work {name}: {i}, Thread: {Thread.CurrentThread.ManagedThreadId}");
        await broker;
        if (extraWork)
        {
            Console.WriteLine(
                   $"Work {name}: {i} (Extra), Thread: {Thread.CurrentThread.ManagedThreadId}");
            await broker;
        }
    }

    Console.WriteLine(
           $"Work {name} is completed, Thread: {Thread.CurrentThread.ManagedThreadId}");
}

Результат:


Work A: 1, Thread: 1
Work B: 1, Thread: 1
Work C: 1, Thread: 1
Work D: 1, Thread: 1
Work A: 2, Thread: 1
Work B: 1 (Extra), Thread: 1
Work C: 2, Thread: 1
Work D is completed, Thread: 1
Work A: 3, Thread: 1
Work B: 2, Thread: 1
Work C is completed, Thread: 1
Work A: 4, Thread: 1
Work B: 2 (Extra), Thread: 1
Work A is completed, Thread: 1
Work B: 3, Thread: 1
Work B: 3 (Extra), Thread: 1
Work B is completed, Thread: 1

Как видно, все вызовы выполнялись в одном и том же потоке, но при этом прерывались, чтобы дать поработать дуг другу, создавая таким образом многозадачность.




Честно сказать, я не могу сходу придумать, где такой подход пригодился бы, но наверняка такая задачка найдется, поэтому лучше знать, что C# позволяет делать и такое.


Исходный код вы можете найти на github.


[Update] Посмотрите комментарии от DistortNeo, он предлагает более практичный вариант решения этой задачи.

Tags:c#async/awaitmultithreading
Hubs: .NET C#
Total votes 17: ↑15 and ↓2+13
Views6.6K

Popular right now