Реализация Kotlin Flow на C#

    image


    Всем привет!


    Последние годы я занимаюсь разработкой под Андроид на Котлине. Не так давно, за неимением RxJava на Kotlin multiplatform, мы начали использовать корутины и flow – холодные стримы для Котлина из коробки. До Андроида я много лет провёл с C#, и там свои корутины есть уже очень давно, только их там так называть не принято. Но вот про аналог flow на async/await я не слышал. Основной инструмент для реактивного программирования – Rx.Net (собственно, здесь rx и родился). Вот я и решил поностальгировать и попробовать напилить велосипед.


    Далее подразумевается, что читатель имеет представление о штуках, про которые говорилось в предыдущем абзаце. Для нетерпеливых — сразу ссылка на репозиторий.


    Дисклеймер: данный код не претендует на использование в продакшене. Это — концепт, не более того. Что-то может работать не совсем так, как задумывалось.


    IFlow и IFlowCollector


    Что ж, начнём с того, что перепишем в лоб интерфейсы Flow и FlowCollector на C#.
    Было:


    interface Flow<out T> {
        suspend fun collect(collector: FlowCollector<T>)
    }
    interface FlowCollector<in T> {
        suspend fun emit(value: T)
    }

    Стало:


        public interface IFlow<out T>
        {
            Task Collect(IFlowCollector<T> collector);
        }
        public interface IFlowCollector<in T>
        {
            Task Emit(T item);
        }
    

    Полагаю, отличия понятны и объясняются разной реализацией асинхронности.


    Чтобы воспользоваться этими интерфейсами, их надо реализовать. Вот что получилось:


        internal class Flow<T> : IFlow<T>
        {
            private readonly Func<IFlowCollector<T>, Task> _emitter;
    
            public Flow(Func<IFlowCollector<T>, Task> emitter)
            {
                _emitter = emitter;
            }
    
            public Task Collect(IFlowCollector<T> collector)
            {
                return _emitter(collector);
            }
    
        }
    
        internal class FlowCollector<T> : IFlowCollector<T>
        {
            private readonly Func<T, Task> _handler;
    
            public FlowCollector(Func<T, Task> handler)
            {
                _handler = handler;
            }
    
            public Task Emit(T item)
            {
                return _handler(item);
            }
    
        }

    В конструктор flow передаём функцию, которая будет эмитить значения. А в конструктор коллектора – функцию, которая будет обрабатывать каждое эмитированное значение.


    Использовать это можно так


    var flow = new Flow<int>(async collector =>
                {
                    await collector.Emit(1);
                    await Task.Delay(1000);
                    await collector.Emit(2);
                    await Task.Delay(1000);
                    await collector.Emit(3);
                });
                var collector = new FlowCollector<int>(async item => Console.WriteLine(item));
                await flow.Collect(collector);

    Думаю, в коде выше всё понятно. Сначала мы создаём Flow, затем создаём коллектор (обработчик каждого элемента). Затем запускаем Flow, «подписав» на него коллектор. Если добавить немного сахара (см. гитхаб), то получим что-то вроде этого:


    await Flow<int>(async collector =>
                {
                    await collector.Emit(1);
                    await Task.Delay(1000);
                    await collector.Emit(2);
                    await Task.Delay(1000);
                    await collector.Emit(3);
                })
                .Collect(Console.WriteLine);

    На Котлине это выглядит вот так:


    scope.launch{
       flow{
            emit(1)
        delay(1000)
        …
       }.collect{ printl(it) }
    }

    Лично мне в варианте на Шарпе больше всего не нравится необходимость явно указывать тип элемента при создании флоу. Но дело тут не в том, что вывод типов в Котлине сильно круче. Функция flow выглядит так:


    public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

    Как мы видим, параметр block помечен аннотацией BuilderInference, которая и подсказывает компилятору, что тип надо взять из этого параметра. Кто-нибудь знает, можно ли напилить подобное для C# на Roslyn?


    CancellationToken


    В rx есть подписка, от которой можно отписаться. В Kotlin Flow за отмену отвечает Job, которую возвращает билдер, либо Coroutine Scope. Нам тоже определённо необходим инструмент, позволяющий Flow завершиться досрочно. В C# для отмены асинхронных операций используется, не побоюсь этого слова, паттерн Cancellation Token. CancellationToken – это класс, объект которого предоставляет асинхронной операции информацию о том, что она отменена. Он прокидывается в асинхронную операцию при старте, и эта операция сама смотрит за его состоянием. А меняется состояние извне.


    Короче, нам надо прокинуть CancellationToken в наши Flow и FlowCollector.


       public interface IFlow<out T>
        {
            Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default);
        }
    
        public interface IFlowCollector<in T>
        {
            Task Emit(T item, CancellationToken cancellationToken = default);
        }

    Реализацию пастить сюда не буду – см. гитхаб.
    Тест теперь будет выглядеть вот так:


                var cts = new CancellationTokenSource();
    
                var flowTask = Flow<int>(async (collector, cancellationToken) =>
                {
                    await collector.Emit(1);
                    await Task.Delay(2000, cancellationToken);
                    await collector.Emit(2);
                    await Task.Delay(2000, cancellationToken);
                    await collector.Emit(3);
                })
                .Collect(item => Log(item), cts.Token);
    
                var cancelationTask = Task.Run(async () =>
                {
                    await Task.Delay(3000);
                    cts.Cancel();
                });
    
                await flowTask;

    Суть такова. Параллельно Flow запускаем операцию, которая через 3 секунды его отменит. В результате Flow не успевает эмитировать третий элемент и завершается с TaskCanceledException, что и является требуемым поведением.


    Немного практики


    Давайте попробуем использовать то, что получилось, на практике. Например, обернём какой-нибудь event в наш Flow. В Rx.Net для этого даже существует библиотечный метод FromEventPattern.


    Чтобы не связываться с реальным UI, я написал класс ClicksEmulator, который генерирует условные нажатия на кнопку мыши через случайные интервалы времени.


        public class ClicksEmulator
        {
            public enum Button { Left, Right }
    
            public class ClickEventArgs : EventArgs
            {
    //…
                public int X { get; }
                public int Y { get; }
                public Button Button { get; }
            }
    
            public event EventHandler<ClickEventArgs> ButtonClick;
    
            public async Task Start(CancellationToken cancellationToken = default)         {…  }
    
        }

    Я опустил реализацию, т.к. она здесь не очень важна. Главное – это event ButtonClick, который мы хотим превратить во Flow. Для это напишем метод-расширение


    public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator)
            {
                return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) =>
                {
                    void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args);
    
                    emulator.ButtonClick += clickHandler;
                    cancellationToken.Register(() =>
                    {
                        emulator.ButtonClick -= clickHandler;
                    });
    
                    await Task.Delay(-1, cancellationToken);
                });
            }

    Сначала мы объявляем обработчик события, который ничего не делает, кроме передачи аргумента события в коллектор. Затем подписываемся на события и регистрируем отписку в случае отмены (завершения) flow. Ну и далее бесконечно ждём и слушаем события ButtonClick, пока cancellationToken не выстрелит.


    Если вы использовали callbackFlow или channelFlow в Котлине или создавали холодные Observable из listener’ов в Rx, то вы отметите, что структура кода во всех случаях очень схожа. Это прекрасно, но возникает вопрос – чем Flow в данном случае лучше, чем сырой event? Вся сила реактивных стримов – в операторах, которые выполняют разные преобразования над ними: фильтрацию, маппинг и многие другие, более сложные. Но у нас их пока нет. Давайте попробуем что-нибудь с этим сделать.


    Filter, Map, OnNext


    Начнем с одного из самых простых операторов — Filter. Он, как это очевидно из названия, будет фильтровать элементы flow в соответствии с заданным предикатом. Это будет extension-метод, применяемый к оригинальному flow и возвращающий flow только с отфильтрованными элементами. Получается, нам надо брать каждый элемент из оригинального flow, проверять, и эмитить дальше, если предикат возвращает true. Так и сделаем:


      public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) =>
                FlowFactory.Flow<T>((collector, cancellationToken) =>
                    source.Collect(item =>
                    {
                        if (predicate(item))
                            collector.Emit(item);
                    }, cancellationToken)
                );

    Теперь, если нам нужны нажатия только на левую кнопку мыши, можно написать так:


    emulator
                    .Clicks()
                    .Filter(click => click.Button == ClicksEmulator.Button.Left)
                    .Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token);
    

    По аналогии напишем операторы Map и OnNext. Первый преобразует каждый элемент исходного flow в другой с помощью переданной функции-маппера. Второй будет возвращать flow с теми же элементами, что и оригинальный, но выполняя на каждом какое-то действие Action (обычно логирование).

            public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) =>
               FlowFactory.Flow<R>((collector, cancellationToken) =>
                   source.Collect(
                            item => collector.Emit(mapper(item)),
                            cancellationToken
                       )
               );
    
            public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) =>
               FlowFactory.Flow<T>((collector, cancellationToken) =>
                   source.Collect(item =>
                   {
                       action(item);
                       collector.Emit(item);
                   }, cancellationToken)
               );

    И пример использования:


    emulator
                    .Clicks()
                    .OnNext(click => Log($"{click.Button} {click.X} {click.Y}"))
                    .Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1)                
                    .Collect(item => Log($"{item}"), cts.Token);

    Вообще для реактивных стримов придумано очень много операторов, их можно найти, например, здесь.


    И ничего не мешает реализовать любые из них для IFlow.


    Те, кто знаком с Rx.Net, знают, что там, помимо новых и специфичных операторов для IObservable, используются методы-расширения из Linq-to-objects, и это позволяет рассматривать стримы как “коллекции событий” и манипулировать ими с помощью привычных Linq-методов. Почему бы вместо того, чтобы писать операторы самим, не попробовать поставить IFlow на рельсы Linq?


    IAsyncEnumerable


    В C# 8 завезли асинхронную версию IEnumerable — IAsyncEnumerable — интерфейс коллекции, по которой можно итерироваться асинхронно. Принципиальная разница между IAsyncEnumerable и реактивными стримами (IObservable и IFlow ) вот в чём. IAsyncEnumerable, как и IEnumerable — это pull-модель. Мы итерируемся по коллекции сколько и когда нам надо и сами тянем из неё элементы. Стримы — это push. Мы подписываемся на события и “реагируем” на них, когда они приходят — на то они и реактивные. Однако от pull-модели можно добиться push-like поведения. Это называется long polling https://en.wikipedia.org/wiki/Push_technology#Long_polling. Суть такая: мы, итерируясь по коллекции, запрашиваем очередной её элемент и ждём сколь угодно долго, пока коллекция нам его не вернёт, т.е. пока очередное событие не наступит. IAsyncEnumerable, в отличие от IEnumerable, позволит нам ждать асинхронно. Короче, нам надо как-то натянуть IAsyncEnumerable на IFlow.


    Как известно, за возврат текущего элемента коллекции IAsyncEnumerable и переход к следующему элементу отвечает интерфейс IAsyncEnumerator. При этом нам надо брать элементы из IFlow, а этим занимается IFlowCollector. Получается вот такой объект, реализующий эти интерфейсы:


    internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T>
        {
            private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1);
            private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1);
    
            private bool _isFinished;
    
            public T Current { get; private set; }
    
            public async ValueTask DisposeAsync() { }
    
            public async Task Emit(T item, CancellationToken cancellationToken)
            {
                await _backpressureSemaphore.WaitAsync(cancellationToken);
                Current = item;
                _longPollingSemaphore.Release();
            }
    
            public async Task Finish()
            {
                await _backpressureSemaphore.WaitAsync();
                _isFinished = true;
                _longPollingSemaphore.Release();
            }
    
            public async ValueTask<bool> MoveNextAsync()
            {
                _backpressureSemaphore.Release();
                await _longPollingSemaphore.WaitAsync();
                return !_isFinished;
            }
        }
    

    Основное здесь методы — Emit, Finish и MoveNextAsync.
    Emit в начале ждёт момента, когда очередной элемент из коллекции будет запрошен. Т.е. не эмитит элемент, пока он не потребуется. Это называется backpressure, отсюда и имя семофора. Затем выставляется текущий item и сообщается, что long polling запрос может получить результат.
    MoveNextAsync вызывается, когда из коллекции тянут очередной элемент. Он отпускает _backpressureSemaphore и ждёт, когда Flow запушит очередной элемент. Затем он возвращает признак того, закончилась ли коллекция. Этот флаг выставляет метод Finish.


    Finish работает по тому же принципу, что и Emit, только вместо очередного элемента выставляет признак конца коллекции.


    Теперь надо этот класс заиспользовать.


    public static class AsyncEnumerableExtensions
        {
            public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default)
            {
                var collector = new FlowCollectorEnumerator<T>();
                flow
                    .Collect(collector, cancellationToken)
                    .ContinueWith(_ => collector.Finish(), cancellationToken);
                return new FlowEnumerableAdapter<T>(collector);
            }
        }
    
        internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T>
        {
            private readonly IAsyncEnumerator<T> _enumerator;
    
            public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator)
            {
                _enumerator = enumerator;
            }
    
            public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
            {
                return _enumerator;
            }
        }
    

    Extension-метод CollectEnumerable для IFlow создаёт FlowCollectorEnumerator и подписывает на него flow, по завершению которого вызывается метод Finish(). И возвращает FlowEnumerableAdapter, который является простейшей реализацией IAsyncEnumerable, использующей FlowCollectorEnumerator в качестве IEnumerator.
    Пробуем, что получилось.


    var clicks = emulator
       .Clicks()
       .OnNext(item => Log($"{item.Button} {item.X} {item.Y}"))
       .CollectEnumerable(cts.Token)
       .Where(click => click.Button == ClicksEmulator.Button.Right)
       .Select(click => click.Y < 540 ? "TOP" : "LEFT");
    
    await foreach (var click in clicks)
    {
       Log($"Clicked at: {click}");
    }

    Здесь мы получаем Flow clicks(), каждый клик логируем, затем превращаем IFlow в IAsyncEnumerable. Далее применяет известные Linq-операторы: оставляем только клики правой кнопкой и получаем, в какой части экрана они сделаны.


    Далее рассмотрим пример посложнее. Будем заменять правый клик на двойной левый. Т.е. нам надо будет мапить каждый элемент не в какой-то другой, а в коллекцию. Либо во Flow, преобразуемый в коллекцию.


    var clicks = emulator
       .Clicks()
       .OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}"))
       .CollectEnumerable(cts.Token)
       .Select(click => click.Button == ClicksEmulator.Button.Left
           ? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click))
           : Flow<ClicksEmulator.ClickEventArgs>(async collector =>
           {
               var changedClick =
                   new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left);
               await collector.Emit(changedClick);
               await Task.Delay(200);
               await collector.Emit(changedClick);
           })
       )
       .SelectMany(flow => flow.CollectEnumerable());
    
    await foreach (var click in clicks)
    {
       Log($"Changed: {click.Button} {click.X} {click.Y}");
    }
    

    Для этого в Linq существует оператор SelectMany. Его аналог в реактивных стримах — FlatMap. Сначала мапим каждый клик в IFlow: для левого клика — Flow с одним этим кликом, для правого — Flow из двух левых кликов с задержкой между ними. А затем в SelectMany превращаем IFlow в IAyncEnumerable.


    И это работает! Т.е. многие операторы не обязательно реализовывать для IFlow — можно использовать Linq.


    Заключение


    Rx.Net — был и остаётся главным инструментом при работе с асинхронными последовательностями событий на C#. Но это довольно большая библиотека по объёмы кода. Как мы увидели, похожую функциональность можно получить значительно проще — всего лишь два интерфейса плюс некоторая обвязка. Это возможно благодаря использованию возможностей языка — async/await. Когда зарождался Rx, эту фичу в C# ещё не завезли.


    Спасибо за внимание!

    • +15
    • 5,2k
    • 8
    Поделиться публикацией
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 8

      +1
      про аналог flow на async/await я не слышал

      Возможно ошибаюсь, но разве TPL Dataflow не аналог?

        0

        Не приходилось иметь дело с этой штукой. После беглого знакомства кажется, что похоже, но на сколько — не готов судить.

        +1

        Чтобы явно не указывать тип при вызове конструктора, можно написать factory метод.
        class Flow/T/{} class Flow{ static Flow/T/ Create/T/(...) }


        (Угловые скобки есть парсер, поэтому использовал слэши)

          0

          А там и есть фабричный метод FlowFactory. Flow, но тип вывести в данном случае всё равно не получантся.

          +1
          IAsyncEnumerable, как и IEnumerable — это pull-модель

          Я бы сказал, что IAsyncEnumerable выглядит как pull-модель (делаем foreach), но ведёт себя как push-модель: внутренность foreach является коллбеком, который может быть вызван в том числе в другом потоке.


          Вот тут я превращаю push-модель (события) в IAsyncEnumerable: https://ptupitsyn.github.io/Ignite-Async-Streams/

            +1
            Я бы сказал, что IAsyncEnumerable выглядит как pull-модель (делаем foreach), но ведёт себя как push-модель: внутренность foreach является коллбеком, который может быть вызван в том числе в другом потоке.


            Я всё же так бы не сказал. Pull — это pull, не важно, на каком потоке.
            П.С. В вашей статье отметился)
            0
            Не так давно, за неимением RxJava на Kotlin multiplatform

            А как же github.com/badoo/Reaktive
              0

              Мы смотрели, но все же инструмент от JetBrains вызывает значительно больше доверие.

            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

            Самое читаемое