Как стать автором
Обновить

RxDart: магические трансформации потоков

Время на прочтение13 мин
Количество просмотров24K
Автор оригинала: Thomas Burkhart

Добро пожаловать — это третья часть моей серии статей об архитектуре Flutter.



На этот раз мы совершим небольшое погружение в магическое царство реактивных расширений (Rx). Я сосредоточусь на наиболее используемых функциях Rx и объясню их применение. Если вы не читали предыдущий пост, сейчас для этого самое время, прежде чем двигаться дальше.


RxDart — это реализация концепции Rx для языка Dart, за что следует сказать спасибо Frank Pepermans и Brian Egan. Если ранее вы использовали Rx в других языках, то наверняка заметите разницу в именовании ряда функций, но это вряд ли вызовет у вас затруднения.


Код для тестирования находится здесь.


До сих пор мы использовали потоки как способ передачи данных из одного места в другое в нашем приложении, но они могут сделать гораздо больше. Давайте взглянем на некоторые функции, которые Rx добавляет в Streams.


Создание Observables


Как указывалось ранее, Observables — это Rx-разновидности потоков с большими возможностями. Есть несколько интересных способов их создания:


Из потока


Любой Stream может быть конвертирован в Observable путем передачи его в конструктор:


var controller = new StreamController<String>();

var streamObservable = new Observable(controller.stream);

streamObservable.listen(print);

Повторяющиеся события


var timerObservable = Observable.periodic(Duration(seconds: 1), (x) => x.toString() );

timerObservable.listen(print);

Этим способом будет сконструирован Observable, выводящий значения с определенным периодом. Так можно заменить таймер.


Из одиночного значения


Порой API ожидает Stream/Observable там, где у вас просто значение. Для таких случаев Observable имеет фабрику.


var justObservable = Observable<int>.just(42);

justObservable.listen(print);

// будет выведено значение: 42

Из Future


  Future<String> asyncFunction() async {
    return Future.delayed(const Duration(seconds: 1), () => "AsyncRsult");
  }

  test('Create Observable from Future', () async {
    print('start');

    var fromFutureObservable = Observable.fromFuture(asyncFunction());

    fromFutureObservable.listen(print);

Создание Observable из Future будет ждать завершения Future и выдавать значение его результата или null, если значение не возвращается. Еще один способ создания потока из Future — это вызов toStream() для любого Future.


Вы можете задаться вопросом, какой смысл преобразовывать Future в Observable/Stream вместо того, чтобы просто ждать его. Будьте уверены, это станет понятным, когда мы исследуем доступные функции для манипулирования данными, пока они "в потоке".


Subjects


Subjects являются заменой StreamController в RxDart, и именно так они и реализуются где-то в недрах библиотеки.


Но их поведение слегка отличается от базовых StreamControllers:


  • вы можете применять listen() напрямую на Subject, без обращения к свойству Stream
  • доступно любое количество подписок, и все слушатели получают одни и те же данные одновременно
  • имеются три разновидности Subjects, которые объясняются ниже с примерами:

PublishSubjects


PublishSubjects ведут себя словно StreamControllers, за исключением возможности множества слушателей:


var subject = new PublishSubject<String>();

subject.listen((item) => print(item)); 

subject.add("Item1");

// Добавим еще подписчика
subject.listen((item) => print(item.toUpperCase())); 

subject.add("Item2");
subject.add("Item3");

// Защита от завершения до приема всех данных 
await Future.delayed(Duration(seconds: 5));

// Завершение всех подписок
subject.close;

Запустите этот код и вы получите:


Item1
ITEM2
Item2
ITEM3
Item3

Понятно, что второй слушатель, опоздавший на вечеринку (мы будем называть их поздними подписчиками), пропустил первый пункт. Чтобы избежать этого, можно использовать BehaviourSubject


BehaviourSubject


С BehaviourSubject каждый новый подписчик получит сперва последнее принятое значение:


var subject = new BehaviorSubject<String>();

subject.listen((item) => print(item)); 

subject.add("Item1");
subject.add("Item2");

subject.listen((item) => print(item.toUpperCase())); 

subject.add("Item3");

На выходе


Item1
ITEM2
ITEM3
Item2
Item3

Вы можете видеть, что Item1 потерян для второго подписчика, но он получает Item2. Вы можете быть удивлены тем, что второй подписчик получает Item3 до того, как первый подписчик получает Item2. Это потому, что последовательность обслуживания подписчиков не гарантирована, хотя все подписчики получают данные в правильном порядке. BehaviourSubject кэширует только последний полученный элемент для поздних подписчиков. Если вам нужно кэшировать больше элементов, вы можете использовать ReplaySubject. В большинстве случаев это не нужно.


Манипулирование данными на лету



Истинная сила Rx заключается в том, что она позволяет обрабатывать данные в процессе передачи по потоку. Каждый из Rx-методов возвращает новый поток с результирующими данными (как на иллюстрации), значит, вы можете связать их вместе в один конвейер обработки, и это делает Rx чрезвычайно мощным инструментом.


Map


Если есть какая-либо операция Stream, которую я больше всего не хочу пропустить, то это map(). Что делает map(), так это то, что она принимает каждый передаваемый элемент данных и применяет к нему некую функцию, после чего помещает результат в результирующий поток. Простой пример:



var subject = new PublishSubject<String>();

subject.map((item) => item.toUpperCase()).listen(print);

subject.add("Item1");
subject.add("Item2");
subject.add("Item3");

Результат:


ITEM1
ITEM2
ITEM3

Но map не обязана возвращать тот же тип данных, который она получает в качестве входных. Следующий пример будет принимать целые числа вместо строк. Дополнительно мы будем связывать два преобразования:


var subject = new PublishSubject<int>();

subject.map((intValue) => intValue.toString())
    .map((item) => item.toUpperCase())
    .listen(print);

subject.add(1);
subject.add(2);
subject.add(3);

или что-то вроде этого:



class DataClass{}

class WrapperClass {
  final DataClass wrapped;

  WrapperClass(this.wrapped); 
}

var subject = new PublishSubject<WrapperClass>();

subject.map<WrapperClass>((a) => new WrapperClass(a));

Одно из наиболее полезных применений .map — это когда вы получаете данные в формате из некоторого REST API или из базы данных и хотите, чтобы они преобразовывались в ваши собственные объекты:


class User {
  final String name;
  final String adress;
  final String phoneNumber;
  final int age;

  // в реальных проектах я бы рекомендовал какой-нибудь 
  // библиотечный сериализатор
  factory User.fromJson(String jsonString) {
    var jsonMap = json.decode(jsonString);

    return User(
      jsonMap['name'],
      jsonMap['adress'],
      jsonMap['phoneNumber'],
      jsonMap['age'],
    );
  }

  User(this.name, this.adress, this.phoneNumber, this.age);

  @override
  String toString() {
    return '$name - $adress - $phoneNumber - $age';
  }
}

void main() {
  test('Map', () {
    // каки-то данные
    var jsonStrings = [
      '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }',
      '{"name": "Stephen King", "adress": "Castle Rock", "phoneNumber":"123456","age": 71 }',
      '{"name": "Jon F. Kennedy", "adress": "Washington", "phoneNumber":"111111","age": 66 }',
    ];

    // симулируем некий json-поток, получаемый из внешнего API/DB.
    var dataStreamFromAPI = new PublishSubject<String>();

    dataStreamFromAPI
        .map<User>((jsonString) => User.fromJson(jsonString)) // json -> User
        .listen((user) => print(user.toString()));

    // Симулируем входные данные
    dataStreamFromAPI.add(jsonStrings[0]);
    dataStreamFromAPI.add(jsonStrings[1]);
    dataStreamFromAPI.add(jsonStrings[2]);
  });

Замечу, не только Streams, но и любой Iterable предлагает функцию map, которую вы можете использовать для преобразований в списках.


Where


Если вас интересуют только определенные значения, встречающиеся в потоке, вы можете использовать функцию .where() вместо использования оператора if в вашем слушателе, это более выразительно и проще для чтения:


var subject = new PublishSubject<int>();

subject.where((val) => val.isOdd)
    .listen( (val) => print('This only prints odd numbers: $val'));

subject.where((val) => val.isEven)
.listen( (val) => print('This only prints even numbers: $val'));

subject.add(1);
subject.add(2);
subject.add(3);

//выводит:
This only prints odd numbers: 1
This only prints even numbers: 2
This only prints odd numbers: 3

Debounce


Это одна из маленьких жемчужин Rx! Представьте, что у вас есть поле поиска, которое осуществляет вызов API REST, если его текст изменен. Выполнение вызова API для каждого нажатия клавиши обходится дорого. Таким образом, вы хотели бы сделать вызов только если пользователь делает паузу на мгновение. Именно для этого используется функция debounce(), которая проглотит все входящие события, если за ними не последует пауза.


var subject = new PublishSubject<String>();

subject.debounce(new Duration(milliseconds: 500)).listen((s) => print(s));

subject.add('A');
subject.add('AB');

await Future.delayed(Duration(milliseconds: 200));

subject.add("ABC");
// Пока выводе нет

await Future.delayed(Duration(milliseconds: 700));

// а сейчас мы получим наше последнее значение: 'ABC'

Поэтому, если вы преобразуете обработчик TextField.onChanged в Observable, то получите элегантное решение.


Expand


Если ваш исходный Stream испускает массивы объектов, а вы хотите обрабатывать каждый объект самостоятельно, вы можете использовать .expand, который сделает именно это:


image


Вы увидите применение этого метода ниже, в примере FireStore.


Merge


Если у вас есть несколько разных потоков, но вы хотите обрабатывать их объекты вместе, вы можете использовать .mergeWith (в других реализациях Rx просто merge), который принимает массив потоков и возвращает один объединенный поток.


image


.mergeWith не гарантирует соблюдение какого-либо порядка в потоках при их объединении. Данные испускаются в порядке входа.


Например, если у вас есть два компонента, которые сообщают об ошибках через поток, и вы хотите, чтобы они вместе отображались в диалоге, вы можете сделать это следующим образом (псевдокод):


@override
initState() {
  super.initState();

  component1.errors.mergeWith([component2.errors])
    .listen( (error) async => await showDialog(error.message));
}

или если вы хотите комбинированное отображение сообщений из нескольких социальных сетей, это может выглядеть так (псевдокод):


final observableTwitter = getTwitterStream().map((data) => new MyAppPost.fromTwitter(data));
final observableFacebook = getFacebookStream().map((data) => new MyAppPost.fromFaceBook(data));
final postStream = observableTwitter.mergeWith([observableFacebook]);

ZipWith


zipWith также объединяет один поток с другим. Но, в отличие от .mergeWith, он не отправляет данные, как только получает элемент от одного из своих исходных потоков. Он ожидает, пока не прибудут элементы из обоих исходных потоков, а затем объединяет их, используя предоставленную zipper-функцию:


image


Сигнатура zipWith выглядит страшновато, но сейчас мы рассмотрим ее:


// R : тип результирующего Stream/Observable
// S : тип второго Stream/Observable
// zipper: функция-сшивка
Observable<R> zipWith<S, R>(Stream<S> other, R zipper(T t, S s))

Весьма упрощенный пример:


new Observable.just(1) // .just() создает Observable, испускающий одно значение
    .zipWith(new Observable.just(2), (one, two) => one + two)
    .listen(print); // печатает 3

Более практичное применение — если вам нужно дождаться двух асинхронных функций, которые возвращают Future, и вы хотите обработать данные, как только будут возвращены оба результата. В этом слегка надуманном примере мы представляем два REST API: один возвращает User, другой — Product в виде строк JSON, и мы хотим дождаться обоих вызовов, прежде чем вернуть объект Invoice.


class Invoice {
  final User user;
  final Product product;

  Invoice(this.user, this.product);

  printInvoice() {
    print(user.toString());
    print(product.toString());
  }
}

// Симуляция HTTP вызова, возвращающего Product, как JSON
Future<String> getProduct() async {
  print("Started getting product");
  await Future.delayed(Duration(seconds: 2));
  print("Finished getting product");
  return '{"name": "Flux compensator", "price": 99999.99}';
}

// Симуляция HTTP вызова, возвращающего User, как JSON
Future<String> getUser() async {
  print("Started getting User");
  await Future.delayed(Duration(seconds: 4));
  print("Finished getting User");
  return '{"name": "Jon Doe", "adress": "New York", "phoneNumber":"424242","age": 42 }';
}

void main() {
  test('zipWith', () async {
    var userObservable =
        Observable.fromFuture(getUser()).map<User>((jsonString) => User.fromJson(jsonString));

    var productObservable = Observable.fromFuture(getProduct())
        .map<Product>((jsonString) => Product.fromJson(jsonString));

    Observable<Invoice> invoiceObservable = userObservable.zipWith<Product, Invoice>(
        productObservable, (user, product) => Invoice(user, product));

    print("Start listening for invoices");
    invoiceObservable.listen((invoice) => invoice.printInvoice());

    // предотвращает раннее завершение теста в целях тестирования
    await Future.delayed(Duration(seconds: 5));
  });
}

Глядя на вывод, вы можете увидеть, как это выполняется асинхронно


Started getting User
Started getting product
Start listening for invoices
Finished getting product
Finished getting User
Jon Doe - New York - 424242 - 42
Flux compensator - 99999.99

CombineLatest


combineLatest тоже объединяет значения потоков, но немного по-другому, чем merge и zip. Он прослушивает большее количество потоков и выдает объединенное значение всякий раз, когда приходит новое значение из одного из потоков. Интересно то, что он генерирует не только измененное значение, но и последние полученные значения всех других исходных потоков. Посмотрите внимательно на эту анимацию:


image


До того, как combineLates выдаст свое первое значение, все исходные потоки должны получить на вход хотя бы один элемент.


В отличие от методов, которые использовались ранее, combineLatest — является статическим. Кроме того, поскольку Dart не допускает перегрузки операторов, существуют версии combLastest в зависимости от количества исходных потоков: combineLatest2...combineLatest9


Хорошее применение combineLatest, например, если у вас есть два Observable<bool>, которые сигнализируют о том, что некоторые части вашего приложения заняты, и вы хотите отобразить спиннер "Занято", если один из них занят. Это может выглядеть следующим образом (псевдокод):


class Model {
  Observable<bool> get isBusy => 
    Observable.combineLatest2(isBusyOne,isBusyTwo, (b1, b2) => b1 || b2);

  PublishSubject<bool> isBusyOne;
  PublishSubject<bool> isBusyTwo;
}

В вашем UI вы можете использовать isBusy с StreamBuilder для отображения Spinner, если полученное значение истинно.


combineLatest очень подходящая функция в комбинации с потоками FireStore snapshots.


Представьте, что вы хотите создать приложение, которое отображает новостную ленту вместе с прогнозом погоды. Сообщения тикеров и данные о погоде хранятся в двух разных коллекциях FireStore. Оба обновляются независимо. Вы хотите отобразить обновления данных с помощью StreamBuilder. С combineLatest это легко:


class WeatherForecast {
  final String forecastText;
  final GeoPoint location;

  factory WeatherForecast.fromMap(Map<String, dynamic> map) {
    return WeatherForecast(map['forecastText'], map['location']);
  }

  WeatherForecast(this.forecastText, this.location);
}

class NewsMessage {
  final String newsText;
  final GeoPoint location;

  factory NewsMessage.fromMap(Map<String, dynamic> map) {
    return NewsMessage(map['newsText'], map['location']);
  }

  NewsMessage(this.newsText, this.location);
}

class CombinedMessage {
  final WeatherForecast forecast;
  final NewsMessage newsMessage;

  CombinedMessage(this.forecast, this.newsMessage);
}

class Model {
  CollectionReference weatherCollection;
  CollectionReference newsCollection;

  Model() {
    weatherCollection = Firestore.instance.collection('weather');
    newsCollection = Firestore.instance.collection('news');
  }

  Observable<CombinedMessage> getCombinedMessages() {
    Observable<WeatherForecast> weatherForecasts = weatherCollection
        .snapshots()
        .expand((snapShot) => snapShot.documents)
        .map<WeatherForecast>((document) => WeatherForecast.fromMap(document.data));

    Observable<NewsMessage> news = newsCollection
        .snapshots()
        .expand((snapShot) => snapShot.documents)
        .map<NewsMessage>((document) => NewsMessage.fromMap(document.data));

    return Observable.combineLatest2(
        weatherForecasts, news, (weather, news) => CombinedMessage(weather, news));
  }
}

В вашем UI это выглядело бы как-то так: StreamBuilder<CombinedMessage>(stream: model.getCombinedMessages(),...).


Distinct


В описанном выше сценарии может случиться, что isBusyOne и isBusyTwo выдают одно и то же значение, что приведет к обновлению пользовательского интерфейса с теми же данными. Чтобы предотвратить это, мы можем использовать .distinct(). Он гарантирует, что данные передаются по потоку только в том случае, если значение нового элемента отличается от последнего. Таким образом, мы изменили бы код на:


  Observable<bool> isBusy => isBusyOne.mergeWith([isBusyTwo]).distinct();

и это также демонстрирует, что мы можем комбинировать наши функции в различные цепочки по желанию.


AsyncMap


Помимо map() есть также функция asyncMap, которая позволяет использовать асинхронную функцию в качестве map-функции. Давайте представим немного другую настройку для нашего примера FireStore. Теперь необходимый WeatherForecast зависит от местоположения NewsMessage и должен обновляться только при получении нового NewsMessage:


Observable<CombinedMessage> getDependendMessages() {

  Observable<NewsMessage> news = newsCollection.snapshots().expand((snapShot) {
    return snapShot.documents;
  }).map<NewsMessage>((document) {
    return NewsMessage.fromMap(document.data);
  });

  return news.asyncMap((newsEntry) async {
    var weatherDocuments =
        await weatherCollection.where('location', isEqualTo: newsEntry.location).getDocuments();
    return new CombinedMessage(
        WeatherForecast.fromMap(weatherDocuments.documents.first.data), newsEntry);
  });
}

Observable, возвращаемый getDependendMessages, будет генерировать новое CombinedMessage каждый раз, когда изменяется newsCollection.


Отладка Observables


Глядя на элегантные цепочки вызовов Rx кажется, что почти невозможно отладить выражение вроде этого:


Observable<NewsMessage> news = newsCollection
    .snapshots()
    .expand((snapShot) => snapShot.documents)
    .map<NewsMessage>((document) => NewsMessage.fromMap(document.data));

Но имейте в виду, что => — это только краткая форма для анонимной функции. Используя Convert to block body, вы получите:


Observable<NewsMessage> news = newsCollection
        .snapshots()
        .expand((snapShot) {
          return snapShot.documents;
        })
        .map<NewsMessage>((document) {
          return NewsMessage.fromMap(document.data);
        });

И теперь мы можем установить точку останова или добавить операторы печати на каждом этапе нашего "конвейера".


Остерегайтесь побочных эффектов


Если вы хотите извлечь выгоду из Rx, чтобы сделать ваш код более надежным, всегда держите в голове, что Rx — это преобразование данных при их перемещении "по конвейерной ленте". Поэтому никогда не вызывайте функции, которые изменяют какие-либо переменные/состояния вне конвейера обработки, пока вы не достигнете функции .listen.
Вместо того, чтобы делать так:


Observable.fromFuture(getProduct())
        .map<Product>((jsonString) { 
     var product = Product.fromJson(jsonString);
    database.save(product);
    setState((){ _product =  product });
    return product;
}).listen();

делайте так:


Observable.fromFuture(getProduct())
        .map<Product>((jsonString) => Product.fromJson(jsonString))
        .listen( (product) {
          database.save(product);  
          setState((){ _product =  product });
        });

Обязанность map() — преобразование данных в потоке, И НИЧЕГО БОЛЬШЕ! Если переданная функция отображения делает что-то еще, это будет рассматриваться как побочный эффект, плодящий потенциальные ошибки, который трудно обнаружить при чтении кода.


Некоторые мысли об освобождении ресурсов


Чтобы избежать утечек памяти, всегда вызывайте cancel() для подписок, dispose() для StreamControllers, close() для Subjects, как только они вам больше не нужны.


Заключение


Поздравляю, если вы остались со мной до этого момента. Теперь вы не только можете использовать Rx, чтобы облегчить свою жизнь, но и подготовиться к следующим постам, в которых мы углубимся в детали RxVMS.

Теги:
Хабы:
+6
Комментарии2

Публикации

Истории

Работа

iOS разработчик
23 вакансии
Swift разработчик
32 вакансии

Ближайшие события

Weekend Offer в AliExpress
Дата20 – 21 апреля
Время10:00 – 20:00
Место
Онлайн
Конференция «Я.Железо»
Дата18 мая
Время14:00 – 23:59
Место
МоскваОнлайн