Dart в совокупности с пакетом Async обладает неплохим функционалом в части работы со стримами. Однако ограничения всё ещё есть. Чтобы сделать стримы более удобными, используется пакет RxDart.
ReactiveX (Rx) появился в 2010 году для .NET, а после был портирован почти на все современные языки программирования и стал стандартом. Версию для Dart опубликовали в 2015 году, и на данный момент она входит в число Flutter Favorite пакетов — её максимально поддерживает комьюнити.
Меня зовут Виталий, я Flutter Team Lead в Surf, и эта небольшая статья станет первой в цикле публикаций на тему RxDart.
Тезисы
Subject— объект, на который можно подписаться и слушать переданные в него значения, аналогStreamControllerв Dart.PublishSubject—Subject, который является аналогом стандартного широковещательного контроллераStreamController.broadcast().ReplaySubject—Subject, который хранит все переданные ранее значения и при подписке возвращает сразу все прошлые значения.BehaviorSubject—Subject, который хранит в себе последнеепереданное значение, и при подписке на этотSubject, сразу возвращает слушателю это значение. Может быть инициализирован только с начальным значением.
Что предлагает Dart из «коробки»
Из «коробки» Dart предоставляет для работы со Stream класс StreamController, который позволяет управлять стримами.
Существует два вида подписки:
single subscription — может быть только один слушатель, который гарантированно получит все сообщения, поступившие после подписки на стрим;
broadcast — слушателей может быть много, но они также будут получать сообщения, которые попадают в стрим после подписки.
Для StreamController.broadcast() можно провести аналогию с радио — активным слушателям информация поставляется в «прямом эфире», и если подключиться к нему позже остальных, то послушать упущенное никак нельзя.
import 'dart:async'; void _firstListener(int value) => print('first: $value'); void _secondListener(int value) => print('second: $value'); void main() { // создаем контроллер final streamController = StreamController<int>.broadcast(); // добавляем первый слушатель, перед добавлением значения streamController.stream.listen(_firstListener); // добавляем значение streamController.add(1); // _firstListener выведет 'first: 1' // добавляем второй слушатель streamController.stream.listen(_secondListener); // ничего не выведется }
Что предлагает RxDart
Пакет добавляет три специализированных контроллера для работы со Stream — разновидности Subject:
PublishSubjectReplaySubjectBehaviorSubject
Рассмотрим их по отдельности.
PublishSubject

PublishSubject — широковещательный («broadcast» или «hot») контроллер, аналог стандартного широковещательного контроллера StreamController.broadcast(), о котором писали выше. Останавливаться здесь не будем.
ReplaySubject

ReplaySubject — также широковещательный контроллер, который стоит использовать, если слушателю нужно передать все прошлые переданные события. Для новых слушателей он «проигрывает» все прошлые события начиная с первого.
Аналог — прямой эфир трансляции, который можно перематывать на ранние моменты.
import 'package:rxdart/subjects.dart'; // ... _firstListener и _secondListener void main() { // создаём replay subject final replaySubject = ReplaySubject<int>(); replaySubject..add(1)..add(2)..add(3)..add(4); // добавляем первый слушатель, перед добавлением значения replaySubject.stream.listen(_firstListener); // _firstListener выведет: // first: 1 // first: 2 // first: 3 // first: 4 // добавляем второй слушатель replaySubject.stream.listen(_secondListener); // _secondListener выведет: // second: 1 // second: 2 // second: 3 // second: 4 }
BehaviorSubject

BehaviorSubject — широковещательный контроллер, который хранит в себе последнее значение или ошибку, и при подписке на этот стрим сразу возвращает слушателю последнее событие, переданное в контроллер.
Аналог — прямая трансляция без возможности перемотки. Когда подключаешься, начинаешь просмотр с самого последнего кадра.
import 'package:rxdart/subjects.dart'; // ... _firstListener и _secondListener void main() { // создаём behavior subject final behaviorSubject = BehaviorSubject<int>(); // добавляем первый слушатель, перед добавлением значения behaviorSubject.stream.listen(_firstListener); // добавляем значение behaviorSubject.add(1); // _firstListener выведет 'first: 1' // добавляем второй слушатель behaviorSubject.stream.listen(_secondListener); // _secondListener выведет 'second: 1' }
Опционально можно передать слушателю исходное значение при подписке — с помощью конструктора BehaviorSubject<T>.seeded, для Rx это более «нативный» способ объявления BehaviorSubject.
import 'package:rxdart/subjects.dart'; // ... _firstListener и _secondListener void main() { // создаём behavior subject final behaviorSubject = BehaviorSubject<int>.seeded(1); // добавляем первый слушатель, перед добавлением значения behaviorSubject.stream.listen(_firstListener); // _firstListener выведет 'first: 1' // добавляем второй слушатель behaviorSubject.stream.listen(_secondListener); // _secondListener выведет 'second: 1' }
BehaviorSubject<T>.seeded можно использовать, когда в стрим требуется передать исходное значение и слушателям необходимо «среагировать» на него.
Например, состояние корзины с товарами хранить в BehaviorSubject, а на экране корзины связать вывод её содержимого напрямую с состоянием в условном классе CartService.
import 'package:rxdart/subjects.dart'; class Product { final String title; const Product(this.title); } class CartState { final List<Product> products; const CartState({required this.products}); factory CartState.empty() => const CartState(products: []); } class CartService { final _cartState = BehaviorSubject<CartState>.seeded(CartState.empty()); Stream<CartState> get cartStateStreamed => _cartState.stream; void addProduct(Product product) { _cartState.add( CartState( products: [ ..._cartState.value.products, product, ], ), ); } } // где-то в приложении объявляем сервис для работы с корзиной final service = CartService(); // подписываемся на состояние корзины, для обновления счётчика товаров, // например в BottomAppBar, он будет обновляться при изменении состояния корзины service.cartStateStreamed.listen((cartState) { print('Число товаров: ${cartState.products.length}'); }); // добавляем товары service..addProduct(const Product("Капуста"))..addProduct(const Product("Картошка")); // чуть позже на экране содержимого корзины подписываемся на состояние и выводим названия товаров service.cartStateStreamed.listen((cartState) { print('Названия товаров: ${cartState.products.map((p) => p.title).join(',')}'); });
В некоторых случаях BehaviorSubject может заменить ValueNotifier, который не оповещает слушателей о своём последнем значении при подписке. BehaviorSubject позволяет аналогично обратиться к последнему значению стрима через геттер BehaviorSubject.value.
Заключение
Если вы хотите применять пакет RxDart в своих проектах и делать их более эффективными, не забывайте про документацию.
Также стоит ознакомиться с документацией для RxJS — пакета для JavaScript, актуального и для RxDart, делая скидку на отличный от стека Flutter язык. В этом пакете классная визуализация принципов Rx, так как Rx пакеты соблюдают общий контракт для всех методов и классов.
На этом не прощаемся — продолжим писать по теме и искать возможности повышения эффективности вашей работы.
Больше полезного про Flutter — в Telegram-канале Surf Flutter Team. Кейсы, лучшие практики, новости и вакансии в команду Flutter Surf в одном месте. Присоединяйтесь.
