
Всем привет!
Последние годы я занимаюсь разработкой под Андроид на Котлине. Не так давно, за неимением RxJava на Kotlin multiplatform, мы начали использовать корутины и flow – холодные стримы для Котлина из коробки. До Андроида я много лет провёл с C#, и там свои корутины есть уже очень давно, только их там так называть не принято. Но вот про аналог flow на async/await я не слышал. Основной инструмент для реактивного программирования – Rx.Net (собственно, здесь rx и родился). Вот я и решил поностальгировать и попробовать напилить велосипед.
Далее подразумевается, что читатель имеет представление о штуках, про которые говорилось в предыдущем абзаце. Для нетерпеливых — сразу ссылка на репозиторий.
Дисклеймер: данный код не претендует на использование в продакшене. Это — концепт, не более того. Что-то может работать не совсем так, как задумывалось.
IFlow и IFlowCollector
Что ж, начнём с того, что перепишем в лоб интерфейсы Flow и FlowCollector на C#.
Было:
interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) } interface FlowCollector<in T> { suspend fun emit(value: T) }
Стало:
public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector); } public interface IFlowCollector<in T> { Task Emit(T item); }
Полагаю, отличия понятны и объясняются разной реализацией асинхронности.
Чтобы воспользоваться этими интерфейсами, их надо реализовать. Вот что получилось:
internal class Flow<T> : IFlow<T> { private readonly Func<IFlowCollector<T>, Task> _emitter; public Flow(Func<IFlowCollector<T>, Task> emitter) { _emitter = emitter; } public Task Collect(IFlowCollector<T> collector) { return _emitter(collector); } } internal class FlowCollector<T> : IFlowCollector<T> { private readonly Func<T, Task> _handler; public FlowCollector(Func<T, Task> handler) { _handler = handler; } public Task Emit(T item) { return _handler(item); } }
В конструктор flow передаём функцию, которая будет эмитить значения. А в конструктор коллектора – функцию, которая будет обрабатывать каждое эмитированное значение.
Использовать это можно так
var flow = new Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }); var collector = new FlowCollector<int>(async item => Console.WriteLine(item)); await flow.Collect(collector);
Думаю, в коде выше всё понятно. Сначала мы создаём Flow, затем создаём коллектор (обработчик каждого элемента). Затем запускаем Flow, «подписав» на него коллектор. Если добавить немного сахара (см. гитхаб), то получим что-то врод�� этого:
await Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }) .Collect(Console.WriteLine);
На Котлине это выглядит вот так:
scope.launch{ flow{ emit(1) delay(1000) … }.collect{ printl(it) } }
Лично мне в варианте на Шарпе больше всего не нравится необходимость явно указывать тип элемента при создании флоу. Но дело тут не в том, что вывод типов в Котлине сильно круче. Функция flow выглядит так:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
Как мы видим, параметр block помечен аннотацией BuilderInference, которая и подсказывает компилятору, что тип надо взять из этого параметра. Кто-нибудь знает, можно ли напилить подобное для C# на Roslyn?
CancellationToken
В rx есть подписка, от которой можно отписаться. В Kotlin Flow за отмену отвечает Job, которую возвращает билдер, либо Coroutine Scope. Нам тоже определённо необходим инструмент, позволяющий Flow завершиться досрочно. В C# для отмены асинхронных операций используется, не побоюсь этого слова, паттерн Cancellation Token. CancellationToken – это класс, объект которого предоставляет асинхронной операции информацию о том, что она отменена. Он прокидывается в асинхронную операцию при старте, и эта операция сама смотрит за его состоянием. А меняется состояние извне.
Короче, нам надо прокинуть CancellationToken в наши Flow и FlowCollector.
public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default); } public interface IFlowCollector<in T> { Task Emit(T item, CancellationToken cancellationToken = default); }
Реализацию пастить сюда не буду – см. гитхаб.
Тест теперь будет выглядеть вот так:
var cts = new CancellationTokenSource(); var flowTask = Flow<int>(async (collector, cancellationToken) => { await collector.Emit(1); await Task.Delay(2000, cancellationToken); await collector.Emit(2); await Task.Delay(2000, cancellationToken); await collector.Emit(3); }) .Collect(item => Log(item), cts.Token); var cancelationTask = Task.Run(async () => { await Task.Delay(3000); cts.Cancel(); }); await flowTask;
Суть такова. Параллельно Flow запускаем операцию, которая через 3 секунды его отменит. В результате Flow не успевает эмитировать третий элемент и завершается с TaskCanceledException, что и является требуемым поведением.
Немного практики
Давайте попробуем использовать то, что получилось, на практике. Например, обернём какой-нибудь event в наш Flow. В Rx.Net для этого даже существует библиотечный метод FromEventPattern.
Чтобы не связываться с реальным UI, я написал класс ClicksEmulator, который генерирует условные нажатия на кнопку мыши через случайные интервалы времени.
public class ClicksEmulator { public enum Button { Left, Right } public class ClickEventArgs : EventArgs { //… public int X { get; } public int Y { get; } public Button Button { get; } } public event EventHandler<ClickEventArgs> ButtonClick; public async Task Start(CancellationToken cancellationToken = default) {… } }
Я опустил реализацию, т.к. она здесь не очень важна. Главное – это event ButtonClick, который мы хотим превратить во Flow. Для это напишем метод-расширение
public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator) { return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) => { void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args); emulator.ButtonClick += clickHandler; cancellationToken.Register(() => { emulator.ButtonClick -= clickHandler; }); await Task.Delay(-1, cancellationToken); }); }
Сначала мы объявляем обработчик события, который ничего не делает, кроме передачи аргумента события в коллектор. Затем подписываемся на события и регистрируем отписку в случае отмены (завершения) flow. Ну и далее бесконечно ждём и слушаем события ButtonClick, пока cancellationToken не выстрелит.
Если вы использовали callbackFlow или channelFlow в Котлине или создавали холодные Observable из listener’ов в Rx, то вы отметите, что структура кода во всех случа��х очень схожа. Это прекрасно, но возникает вопрос – чем Flow в данном случае лучше, чем сырой event? Вся сила реактивных стримов – в операторах, которые выполняют разные преобразования над ними: фильтрацию, маппинг и многие другие, более сложные. Но у нас их пока нет. Давайте попробуем что-нибудь с этим сделать.
Filter, Map, OnNext
Начнем с одного из самых простых операторов — Filter. Он, как это очевидно из названия, будет фильтровать элементы flow в соответствии с заданным предикатом. Это будет extension-метод, применяемый к оригинальному flow и возвращающий flow только с отфильтрованными элементами. Получается, нам надо брать каждый элемент из оригинального flow, проверять, и эмитить дальше, если предикат возвращает true. Так и сделаем:
public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { if (predicate(item)) collector.Emit(item); }, cancellationToken) );
Теперь, если нам нужны нажатия только на левую кнопку мыши, можно написать так:
emulator .Clicks() .Filter(click => click.Button == ClicksEmulator.Button.Left) .Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token);
По аналогии напишем операторы Map и OnNext. Первый преобразует каждый элемент исходного flow в другой с помощью переданной функции-маппера. Второй будет возвращать flow с теми же элементами, что и оригинальный, но выполняя на каждом какое-то действие Action (обычно логирование).
public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) => FlowFactory.Flow<R>((collector, cancellationToken) => source.Collect( item => collector.Emit(mapper(item)), cancellationToken ) ); public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { action(item); collector.Emit(item); }, cancellationToken) );
И пример использования:
emulator .Clicks() .OnNext(click => Log($"{click.Button} {click.X} {click.Y}")) .Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1) .Collect(item => Log($"{item}"), cts.Token);
Вообще для реактивных стримов придумано очень много операторов, их можно найти, например, здесь.
И ничего не мешает реализовать любые из них для IFlow.
Те, кто знаком с Rx.Net, знают, что там, помимо новых и специфичных операторов для IObservable, используются методы-расширения из Linq-to-objects, и это позволяет рассматривать стримы как “коллекции событий” и манипулировать ими с помощью привычных Linq-методов. Почему бы вместо того, чтобы писать операторы самим, не попробовать поставить IFlow на рельсы Linq?
IAsyncEnumerable
В C# 8 завезли асинхронную версию IEnumerable — IAsyncEnumerable — интерфейс коллекции, по которой можно итерироваться асинхронно. Принципиальная разница между IAsyncEnumerable и реактивными стримами (IObservable и IFlow ) вот в чём. IAsyncEnumerable, как и IEnumerable — это pull-модель. Мы итерируемся по коллекции сколько и когда нам надо и сами тянем из неё элементы. Стримы — это push. Мы подписываемся на события и “реагируем” на них, когда они приходят — на то они и реактивные. Однако от pull-модели можно добиться push-like поведения. Это называется long polling https://en.wikipedia.org/wiki/Push_technology#Long_polling. Суть такая: мы, итерируясь по коллекции, запрашиваем очередной её элемент и ждём сколь угодно долго, пока коллекция нам его не вернёт, т.е. пока очередное событие не наступит. IAsyncEnumerable, в отличие от IEnumerable, позволит нам ждать асинхронно. Короче, нам надо как-то натянуть IAsyncEnumerable на IFlow.
Как известно, за возврат текущего элемента коллекции IAsyncEnumerable и переход к следующему элементу отвечает интерфейс IAsyncEnumerator. При этом нам надо брать элементы из IFlow, а этим занимается IFlowCollector. Получается вот такой объект, реализующий эти интерфейсы:
internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T> { private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1); private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1); private bool _isFinished; public T Current { get; private set; } public async ValueTask DisposeAsync() { } public async Task Emit(T item, CancellationToken cancellationToken) { await _backpressureSemaphore.WaitAsync(cancellationToken); Current = item; _longPollingSemaphore.Release(); } public async Task Finish() { await _backpressureSemaphore.WaitAsync(); _isFinished = true; _longPollingSemaphore.Release(); } public async ValueTask<bool> MoveNextAsync() { _backpressureSemaphore.Release(); await _longPollingSemaphore.WaitAsync(); return !_isFinished; } }
Основное здесь методы — Emit, Finish и MoveNextAsync.
Emit в начале ждёт момента, когда очередной элемент из коллекции будет запрошен. Т.е. не эмитит элемент, пока он не потребуется. Это называется backpressure, отсюда и имя семофора. Затем выставляется текущий item и сообщается, что long polling запрос может получить результат.
MoveNextAsync вызывается, когда из коллекции тянут очередной элемент. Он отпускает _backpressureSemaphore и ждёт, когда Flow запушит очередной элемент. Затем он возвращает признак того, закончилась ли коллекция. Этот флаг выставляет метод Finish.
Finish работает по тому же принципу, что и Emit, только вместо очередного элемента выставляет признак конца коллекции.
Теперь надо этот класс заиспользовать.
public static class AsyncEnumerableExtensions { public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default) { var collector = new FlowCollectorEnumerator<T>(); flow .Collect(collector, cancellationToken) .ContinueWith(_ => collector.Finish(), cancellationToken); return new FlowEnumerableAdapter<T>(collector); } } internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T> { private readonly IAsyncEnumerator<T> _enumerator; public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator) { _enumerator = enumerator; } public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return _enumerator; } }
Extension-метод CollectEnumerable для IFlow создаёт FlowCollectorEnumerator и подписывает на него flow, по завершению которого вызывается метод Finish(). И возвращает FlowEnumerableAdapter, который является простейшей реализацией IAsyncEnumerable, использующей FlowCollectorEnumerator в качестве IEnumerator.
Пробуем, что получилось.
var clicks = emulator .Clicks() .OnNext(item => Log($"{item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Where(click => click.Button == ClicksEmulator.Button.Right) .Select(click => click.Y < 540 ? "TOP" : "LEFT"); await foreach (var click in clicks) { Log($"Clicked at: {click}"); }
Здесь мы получаем Flow clicks(), каждый клик логируем, затем превращаем IFlow в IAsyncEnumerable. Далее применяет известные Linq-операторы: оставляем только клики правой кнопкой и получаем, в какой части экрана они сделаны.
Далее рассмотрим пример посложнее. Будем заменять правый клик на двойной левый. Т.е. нам надо будет мапить каждый элемент не в какой-то другой, а в коллекцию. Либо во Flow, преобразуемый в коллекцию.
var clicks = emulator .Clicks() .OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Select(click => click.Button == ClicksEmulator.Button.Left ? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click)) : Flow<ClicksEmulator.ClickEventArgs>(async collector => { var changedClick = new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left); await collector.Emit(changedClick); await Task.Delay(200); await collector.Emit(changedClick); }) ) .SelectMany(flow => flow.CollectEnumerable()); await foreach (var click in clicks) { Log($"Changed: {click.Button} {click.X} {click.Y}"); }
Для этого в Linq существует оператор SelectMany. Его аналог в реактивных стримах — FlatMap. Сначала мапим каждый клик в IFlow: для левого клика — Flow с одним этим кликом, для правого — Flow из двух левых кликов с задержкой между ними. А затем в SelectMany превращаем IFlow в IAyncEnumerable.
И это работает! Т.е. многие операторы не обязательно реализовывать для IFlow — можно использовать Linq.
Заключение
Rx.Net — был и остаётся главным инструментом при работе с асинхронными последовательностями событий на C#. Но это довольно большая библиотека по объёмы кода. Как мы увидели, похожую функциональность можно получить значительно проще — всего лишь два интерфейса плюс некоторая обвязка. Это возможно благодаря использованию возможностей языка — async/await. Когда зарождался Rx, эту фичу в C# ещё не завезли.
Спасибо за внимание!
