Привет, Хабр!

Reactive Extensions, известные также как Rx.NET, представляют собой библиотеку для .NET, предназначенную для композиции асинхронных и событийно-ориентированных программ с помощью наблюдаемых последовательностей и операторов запросов в стиле LINQ.

Основная фича Rx заключается в управлении асинхронными данными так, как если бы они были синхронными коллекциями.

В этой статье мы рассмотрми, как работать с Rx в C#.

Немного теории

Observables – это основа Rx, представляющая потоки данных, которые можно наблюдать. Observables могут генерировать данные с течением времени, будь то из массивов, событий или тех же асинхронных запросов.

Observers – объекты, которые подписываются на Observables и реагируют на данные, поступающие из них. Observer реализует три метода, которые вызываются соответственно при получении данных, ошибки или завершении потока. С помощью метода Subscribe, Observer подключается к Observable и начинает получать уведомления.

Операторы выполняю основную роль манипуляции и управлении потоками данных. Они позволяют трансформировать, фильтровать, комбинировать и даже создавать новые потоки на основе существующих.

Синтаксис Rx

Для создания Observable последовательностей в Rx используется метод Observable.Create. Этот метод позволяет определить логику создания потока данных, которая будет исполняться при подписке на Observable:

IObservable<int> observable = Observable.Create(observer => {
    observer.OnNext(1);
    observer.OnNext(2);
    observer.OnNext(3);
    observer.OnCompleted();
    return Disposable.Empty;
});

Операторы:

Select:

var observable = Observable.Range(1, 5);
var selected = observable.Select(x => x * 10);
selected.Subscribe(Console.WriteLine); // вывод: 10, 20, 30, 40, 50

Where:

var observable = Observable.Range(1, 10);
var filtered = observable.Where(x => x % 2 == 0);
filtered.Subscribe(Console.WriteLine); // вывод: 2, 4, 6, 8, 10

Aggregate:

var observable = Observable.Range(1, 5);
var aggregated = observable.Aggregate((acc, x) => acc + x);
aggregated.Subscribe(Console.WriteLine); // вывод: 15

Работа с множественными источниками данных:

Merge:

var first = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3);
var second = Observable.Interval(TimeSpan.FromSeconds(2)).Take(2);
var merged = Observable.Merge(first, second);
merged.Subscribe(Console.WriteLine); // вывод: 0, 0, 1, 1, 2

Concat:

var first = Observable.Range(1, 3);
var second = Observable.Range(4, 3);
var concatenated = Observable.Concat(first, second);
concatenated.Subscribe(Console.WriteLine); // вывод: 1, 2, 3, 4, 5, 6

Zip:

var numbers = Observable.Range(1, 3);
var letters = Observable.From(new[] {'a', 'b', 'c'});
var zipped = Observable.Zip(numbers, letters, (n, l) => $"{n}{l}");
zipped.Subscribe(Console.WriteLine); // вывод: 1a, 2b, 3c

Управление памятью и избежание утечек с помощью Dispose

Часто при работе с Rx используют Dispose для предотвращения утечек памяти. Когда вы подписываетесь на Observable, Rx.NET возвращает объект IDisposable, который нужно корректно утилизировать после окончания использования подписки. Это используют в проектах, где подписки могут динамически создаваться и удаляться.

var subscription = observable.Subscribe(item => Console.WriteLine(item));
// после завершения использования подписки, не забываем освободить ресурсы
subscription.Dispose();

Примеры использования

Обработка кнопочных событий в пользовательском интерфейсе

Допустим, есть форма с кнопкой, и хочется реагировать на клики по этой кнопке, но только если клики происходят с интервалом не менее одной секунды, чтобы избежать двойного клика:

// подключение Rx и Windows Forms
using System;
using System.Windows.Forms;
using System.Reactive.Linq;

public class ReactiveForm : Form
{
    private Button myButton;

    public ReactiveForm()
    {
        myButton = new Button { Text = "Click me!" };
        Controls.Add(myButton);

        // создание потока событий клика
        var clicks = Observable.FromEventPattern(
            h => myButton.Click += h,
            h => myButton.Click -= h);

        // фильтрация событий с интервалом в 1 секунду
        clicks
            .Throttle(TimeSpan.FromSeconds(1))
            .Subscribe(_ => Console.WriteLine("Button clicked!"));
    }
}

Комбинирование данных из нескольких источников

Представим, что есть два источника данных, один из которых отправляет цены товаров, а другой - информацию о скидках. Нужно комбинировать эти потоки для расчета окончательной цены:

IObservable<decimal> prices = Observable.Return(100m); // пример потока цен
IObservable<decimal> discounts = Observable.Return(0.1m); // пример потока скидок

var finalPrice = prices.Zip(discounts, (price, discount) => price * (1 - discount));
finalPrice.Subscribe(price => Console.WriteLine($"Final price: {price}"));

Отслеживание состояния кнопки с задержкой

Если хочется отслеживать состояние кнопки, но с задержкой, чтобы предотвратить реакцию на случайные или кратковременные нажатия:

var buttonStates = Observable.FromEventPattern<StateChangedEventArgs>(button, "StateChanged");

buttonStates
    .Select(evt => evt.EventArgs.NewState)
    .DistinctUntilChanged() // игнорирование повторяющихся состояний
    .Throttle(TimeSpan.FromMilliseconds(500)) // задержка для стабилизации состояния
    .Subscribe(state => Console.WriteLine($"Button state stabilized to: {state}"));

Реактивное управление потоками данных API

Используем Rx для управления асинхронными вызовами API для получения погоды. Пример того, как можно организовать повторные запросы и обработку ошибок:

IObservable<string> weatherData = Observable.Interval(TimeSpan.FromMinutes(1))
    .SelectMany(_ => Observable.FromAsync(() => GetWeatherAsync()))  // асинхронный вызов API
    .Retry(3)  // повтор при ошибке до 3 раз
    .Catch<string, Exception>(ex => Observable.Return("Error fetching weather data"));

weatherData.Subscribe(data => Console.WriteLine($"Current weather: {data}"),
                      error => Console.WriteLine($"An error occurred: {error}"));

Reactive Extensions позволяет писать более чистый и реактивный код.

Статья подготовлена в преддверии старта специализации C# Developer. На странице специализации вы можете подробнее узнать о программе, а также зарегистрироваться на бесплатные вебинары.