Основы обработки асинхронных событий с помощью Rx в C#
Привет, Хабр!
Reactive Extensions, известные также как Rx.NET, представляют собой библиотеку для .NET, предназначенную для композиции асинхронных и событийно-ориентированных программ с помощью наблюдаемых последовательностей и операторов запросов в стиле LINQ.
Основная фича Rx заключается в управлении асинхронными данными так, как если бы они были синхронными коллекциями.
В этой статье мы рассмотрми, как работать с Rx в C#.
Немного теории
Observables – это основа Rx, представляющая потоки данных, которые можно наблюдать. Observables могут генерировать данные с течением времени, будь то из массивов, событий или тех же асинхронных запросов.
Observers – объекты, которые подписываются на Observables и реагируют на данные, поступающие из них. Observer реализует три метода, которые вызываются соответственно при получении данных, ошибки или завершении потока. С помощью метода Subscribe
, Observer подключается к Observable и начинает получать уведомления.
Операторы выполняю основную роль манипуляции и управлении потоками данных. Они позволяют трансформировать, фильтровать, комбинировать и даже создавать новые потоки на основе существующих.
Синтаксис Rx
Для создания Observable последовательностей в Rx используется метод Observable.Create
. Этот метод позволяет определить логику создания потока данных, которая будет исполняться при подписке на Observable:
IObservable<int> observable = Observable.Create(observer => {
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnCompleted();
return Disposable.Empty;
});
Операторы:
Select:
var observable = Observable.Range(1, 5);
var selected = observable.Select(x => x * 10);
selected.Subscribe(Console.WriteLine); // вывод: 10, 20, 30, 40, 50
Where:
var observable = Observable.Range(1, 10);
var filtered = observable.Where(x => x % 2 == 0);
filtered.Subscribe(Console.WriteLine); // вывод: 2, 4, 6, 8, 10
Aggregate:
var observable = Observable.Range(1, 5);
var aggregated = observable.Aggregate((acc, x) => acc + x);
aggregated.Subscribe(Console.WriteLine); // вывод: 15
Работа с множественными источниками данных:
Merge:
var first = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3);
var second = Observable.Interval(TimeSpan.FromSeconds(2)).Take(2);
var merged = Observable.Merge(first, second);
merged.Subscribe(Console.WriteLine); // вывод: 0, 0, 1, 1, 2
Concat:
var first = Observable.Range(1, 3);
var second = Observable.Range(4, 3);
var concatenated = Observable.Concat(first, second);
concatenated.Subscribe(Console.WriteLine); // вывод: 1, 2, 3, 4, 5, 6
Zip:
var numbers = Observable.Range(1, 3);
var letters = Observable.From(new[] {'a', 'b', 'c'});
var zipped = Observable.Zip(numbers, letters, (n, l) => $"{n}{l}");
zipped.Subscribe(Console.WriteLine); // вывод: 1a, 2b, 3c
Управление памятью и избежание утечек с помощью Dispose
Часто при работе с Rx используют Dispose
для предотвращения утечек памяти. Когда вы подписываетесь на Observable, Rx.NET возвращает объект IDisposable
, который нужно корректно утилизировать после окончания использования подписки. Это используют в проектах, где подписки могут динамически создаваться и удаляться.
var subscription = observable.Subscribe(item => Console.WriteLine(item));
// после завершения использования подписки, не забываем освободить ресурсы
subscription.Dispose();
Примеры использования
Обработка кнопочных событий в пользовательском интерфейсе
Допустим, есть форма с кнопкой, и хочется реагировать на клики по этой кнопке, но только если клики происходят с интервалом не менее одной секунды, чтобы избежать двойного клика:
// подключение Rx и Windows Forms
using System;
using System.Windows.Forms;
using System.Reactive.Linq;
public class ReactiveForm : Form
{
private Button myButton;
public ReactiveForm()
{
myButton = new Button { Text = "Click me!" };
Controls.Add(myButton);
// создание потока событий клика
var clicks = Observable.FromEventPattern(
h => myButton.Click += h,
h => myButton.Click -= h);
// фильтрация событий с интервалом в 1 секунду
clicks
.Throttle(TimeSpan.FromSeconds(1))
.Subscribe(_ => Console.WriteLine("Button clicked!"));
}
}
Комбинирование данных из нескольких источников
Представим, что есть два источника данных, один из которых отправляет цены товаров, а другой - информацию о скидках. Нужно комбинировать эти потоки для расчета окончательной цены:
IObservable<decimal> prices = Observable.Return(100m); // пример потока цен
IObservable<decimal> discounts = Observable.Return(0.1m); // пример потока скидок
var finalPrice = prices.Zip(discounts, (price, discount) => price * (1 - discount));
finalPrice.Subscribe(price => Console.WriteLine($"Final price: {price}"));
Отслеживание состояния кнопки с задержкой
Если хочется отслеживать состояние кнопки, но с задержкой, чтобы предотвратить реакцию на случайные или кратковременные нажатия:
var buttonStates = Observable.FromEventPattern<StateChangedEventArgs>(button, "StateChanged");
buttonStates
.Select(evt => evt.EventArgs.NewState)
.DistinctUntilChanged() // игнорирование повторяющихся состояний
.Throttle(TimeSpan.FromMilliseconds(500)) // задержка для стабилизации состояния
.Subscribe(state => Console.WriteLine($"Button state stabilized to: {state}"));
Реактивное управление потоками данных API
Используем Rx для управления асинхронными вызовами API для получения погоды. Пример того, как можно организовать повторные запросы и обработку ошибок:
IObservable<string> weatherData = Observable.Interval(TimeSpan.FromMinutes(1))
.SelectMany(_ => Observable.FromAsync(() => GetWeatherAsync())) // асинхронный вызов API
.Retry(3) // повтор при ошибке до 3 раз
.Catch<string, Exception>(ex => Observable.Return("Error fetching weather data"));
weatherData.Subscribe(data => Console.WriteLine($"Current weather: {data}"),
error => Console.WriteLine($"An error occurred: {error}"));
Reactive Extensions позволяет писать более чистый и реактивный код.
Статья подготовлена в преддверии старта специализации C# Developer. На странице специализации вы можете подробнее узнать о программе, а также зарегистрироваться на бесплатные вебинары.