Как стать автором
Обновить

Основы Dart Streams

Время на прочтение 8 мин
Количество просмотров 45K
Автор оригинала: Thomas Burkhart

Это вторая часть моей серии по поводу Flutter Architecture:



Потоки являются основным строительным блоком RxVMS, их понимание является абсолютно необходимым условием для работы с этой библиотекой, так что мы подробнее остановимся на них в этом посте.


Оказалось, что включение Rx в этот пост сделало бы его слишком длинным, поэтому я разделил его на две части.


Пусть течет


Я читаю множество комментов, что дескать потоки, и особенно Rx, слишком сложны для понимания и, как следствие, для использования.


Мне бы хотелось, чтобы вы знали, что я не считаю себя гуру Rx. Освоить всю мощь его нелегко, и я признаю, что продолжаю учиться. Но позвольте мне с самого начала исправить одно заблуждение: вам не нужно быть волшебником Rx, чтобы начать получать массу преимуществ от использования потоков и этой технологии. Я приложу максимум усилий, чтобы объяснить вам потоки наиболее доступным образом.


Что такое потоки?


На мой взгляд наилучшей аналогией потоков является лента конвейера. Вы можете положить что-либо на один его конец и это "что-либо" автоматически перенесется на другой. В отличие от физического конвейера потоки манипулируют объектами данных, перенося их автоматически от начала — но куда? Как и в реальном конвейере, если нет ничего такого, что бы уловило данные на другом конце, они просто "упадут" и исчезнут (это, конечно не вполне верно для Dart Streams, но лучше всего обращаться с потоками, как будто это именно так).



Дабы избежать потери данных, вы можете установить "ловушку" на выходе потока. Так вы сможете улавливать данные и производить нужные манипуляции с ними всякий раз, когда объекты данных будут достигать конца потока.



Помните:


  • Если ловушка не установлена, данные просто пропадут навсегда и не будет способа получить их снова (опять же, не совсем так с Dart Streams, но вам лучше притворяться, что это так)
  • После отправки данных в поток вам не нужно приостанавливать выполнение программы и ждать, пока они достигнут конца, все это происходит в фоне.
  • Ловушка может принять данные в любое время, совсем необязательно сразу после отправки (но волноваться не следует, потоки на самом деле очень быстрые). Представьте, что вы не знаете, как быстро или как долго движется "ремень конвейера". Это означает, что помещение чего-либо в поток полностью отделено от реакции на элемент на другом конце. Ваша ловушка сработает и поймает предмет, когда он попадет туда. (Некоторые из вас уже могут понять, что это хорошо вписывается в реактивный способ, которым Flutter обновляет свои виджеты)
  • Вы можете установить ловушку задолго до того, как начнется работа и появится первый элемент
  • Поток работает по принципу FIFO. Данные всегда приходят в порядке их размещения в поток.

А что такое Rx?


Rx, сокращенно от Reactive Extensions (реактивные расширения), — это потоки "на стероидах". Это концепция, очень похожая на Streams, которая была изобретена для .Net framework командой Microsoft. Так как .Net уже имел тип Stream, который используется для файлового ввода-вывода, они назвали Rx-потоки Observables и создали множество функций для манипулирования данными, проходящими через них. Dart имеет Streams, встроенные в его языковую спецификацию, которые уже предлагают большую часть этой функциональности, но не все. Вот почему был разработан пакет RxDart; он основан на Dart Streams, но расширяет их функциональность. Я рассмотрю Rx и RxDart в следующей части этой серии.


Немного терминов


Dart Streams и Rx используют некоторую терминологию, которая может выглядеть страшновато, поэтому вот перевод. Сначала идет термин Dart, потом Rx.


  • Stream/Observable. Это "конвейер", описанный ранее. Stream может быть преобразован в Observable и везде, где ожидается Stream, можно присваивать Observable. Так что не путайтесь, если я буду смешивать эти термины в процессе объяснения
  • listen/subscribe — установка ловушки-слушателя
  • StreamController/Subject. "Левая" сторона конвейерной ленты, где вы помещаете данные в Stream. Они немного отличаются по своим свойствам и характеристикам, но служат одной и той же цели
  • Emitting an item/data. Момент, когда данные появляются на выходе из "конвейера"

Создание потока


Если вы собираетесь продолжать изучать тему, пожалуйста, клонируйте этот проект с примерами. Я буду использовать систему тестов Dart/Flutter.


Для создания потока вы создаете StreamController


var controller = new StreamController<String>();

controller.add("Item1"); // Отправляем первый элемент в поток

Шаблонный тип (в данном случае String) передаваемый при создании StreamController определяет тип объектов, которые мы можем отправлять в поток. Это может быть ЛЮБОЙ тип! Вы можете создать при желании StreamController<List<MyObject>>() и поток будет передавать лист целиком вместо одного объекта.


Установка ловушки


Если вы запускали указанный тест, то ничего не смогли увидеть, ведь ничто не отловило нашу строку на выходе потока. Теперь установим ловушку:


var controller = new StreamController<String>();

controller.stream.listen((item) => print(item)); // ловушка

controller.add("Item1");
controller.add("Item2");
controller.add("Item3");

Теперь ловушка установлена, используя метод .listen(). Запись выглядит controller.stream.listen, но если вы прокрутите это задом наперед, словно какой-нибудь альбом из 60-х, то проявится истинный смысл написанного: "слушать поток данного контроллера"


Вам необходимо передать в метод .listen() некию функцию, чтобы как-то манипулировать с пришедшими данными. Функция должна принимать параметр типа, указанного при создании StreamController, в данном случае это String.


Если вы запустите вышеуказанный код, то увидите


Item1
Item2
Item3

По моему, наибольшей проблемой для новичков в Streams является то, что можно определить реакцию для испускаемого элемента задолго до того, как первый элемент будет помещен в поток, запуская вызов этой реакции.


Завершение прослушивания


Код выше упустил маленькую, но важную часть. listen() возвращает StreamSubscription — объект подписки на поток. Вызов его метода .cancel() завершает подписку, освобождая ресурсы, и предупреждая вызов вашей прослушивающей функции после того, как это стало ненужным.


var controller = new StreamController<String>();

StreamSubscription subscription = controller.stream.listen((item) => print(item)); // This is the Trap

controller.add("Item1");
controller.add("Item2");
controller.add("Item3");

// Это сделано для того, чтобы среда тестирования не убила этот процесс  
// до того, как все объекты из Stream были обработаны
await Future.delayed(Duration(milliseconds: 500));

subscription.cancel;

Подробности о слушателях


Функция для listen() может быть как лямбдой, так и простой функцией.


void myPrint(String message) {
  print(message);
}

StreamSubscription subscription = controller.stream.listen((item) => print(item)); // использование лямбда-функции

StreamSubscription subscription2 = controller.stream.listen(myPrint); // использование обычной функции

StreamSubscription subscription3 = controller.stream.listen((item) {
    print(item);
    print(item.toUpperCase);
}); // лямбда-блок

Важное замечание: большинство Dart-потоков позволяют только разовую подписку, то есть на них нельзя подписываться повторно после завершения подписки — это вызовет исключение. Это их отличие от других реализаций Rx.


Полная сигнатура listen() выглядит так:


 /* excerpt from the API doc
   * The [onError] callback must be of type `void onError(error)` or
   * `void onError(error, StackTrace stackTrace)`. If [onError] accepts
   * two arguments it is called with the error object and the stack trace
   * (which could be `null` if the stream itself received an error without
   * stack trace).
   * Otherwise it is called with just the error object.
   * If [onError] is omitted, any errors on the stream are considered unhandled,
   * and will be passed to the current [Zone]'s error handler.
   * By default unhandled async errors are treated
   * as if they were uncaught top-level errors.
   *
   * If this stream closes and sends a done event, the [onDone] handler is
   * called. If [onDone] is `null`, nothing happens.
   *
   * If [cancelOnError] is true, the subscription is automatically canceled
   * when the first error event is delivered. The default is `false`.
    */
  StreamSubscription<T> listen(void onData(T event),
      {Function onError, void onDone(), bool cancelOnError});

Это означает, что вы можете сделать больше, чем просто передать один обработчик для отправленных данных. Вы также можете иметь обработчик для ошибок, и другой для закрытия потока со стороны контроллера (onDone). Исключения, которые вызываются изнутри Stream, будут вызывать onError(), если вы его предоставите, в противном случае они просто проглатываются, и вы никогда не узнаете, что что-то пошло не так.


Пример Flutter-потоков


Чтобы облегчить понимание следующих глав, я сделал отдельную ветку репозитория.
Пожалуйста, склонируйте ее


В качестве первого примера я взял хорошо известное приложение-счетчик, которое вы получаете при создании нового проекта Flutter, и немного его реорганизовал. Я добавил класс модели для хранения состояния приложения, которое в основном является значением счетчика:


class Model {
    int _counter = 0;
    StreamController _streamController = new StreamController<int>();

    Stream<int> get counterUpdates => _streamController.stream;

    void incrementCounter() {
        _counter++;
        _streamController.add(_counter);
    }
}

здесь вы можете увидеть очень типичный шаблон: вместо публикации всего StreamController, мы просто публикуем его свойство Stream.


Чтобы сделать Модель доступной для UI, я сделал ее статическим полем в объекте App, потому что не хотел вводить InheritedWidget или ServiceLocator. Для простого примера это сойдет с рук, но я бы не стал делать это в настоящем приложении!


Добавим в main.dart:


class _MyHomePageState extends State<MyHomePage> {
  int _counter = 0;
  StreamSubscription streamSubscription;

  @override
  void initState() {
    streamSubscription = MyApp.model.counterUpdates.listen((newVal) => setState(() {
          _counter = newVal;
        }));

    super.initState();
  }

  // Хотя этот State не будет уничтожен, пока приложение работает, 
  // хороший стиль требует освобождать подписки явно
  @override
  void dispose() {
      streamSubscription?.cancel();
      super.dispose();
    }

initState() хорошее место для установки слушателя, и, будучи добропорядочными гражданами Darts, мы всегда освобождаем подписку в dispose(), верно?


В дереве виджетов нам просто нужно адаптировать обработчик onPressed кнопки FAB (кнопка с плавающим действием).


floatingActionButton: new FloatingActionButton(
            onPressed: MyApp.model.incrementCounter,
    tooltip: 'Increment',
    child: new Icon(Icons.add),
    ),

Этим способом мы создали чистое разделение между View и Model, используя Stream.


Применяем StreamBuilder


Источник


Вместо использования initState() и setState() для наших нужд Flutter поставляется с удобным виджетом StreamBuilder. Как вы уже догадались, он принимает функцию Stream и метод-строитель, вызывающийся всякий раз, когда Stream выдает новое значение. И теперь нам не нужны явные инициализация и освобождение:


body: new Center(
  child: new Column(
    mainAxisAlignment: MainAxisAlignment.center,
    children: <Widget>[
      new Text(
        'You have pushed the button this many times:',
      ),
      StreamBuilder<int>(
          initialData: 0,
          stream: MyApp.model.counterUpdates,
          builder: (context, snappShot) {
            String valueAsString = 'NoData';

            if (snappShot != null && snappShot.hasData) {
              valueAsString = snappShot.data.toString();
            }

            return Text(
              valueAsString,
              style: Theme.of(context).textTheme.display1,
            );
          }),
    ],
  ),
),

Мы почти закончили, обещаю. Вот три вещи, которые полезно знать:


  • большое преимущество использования StreamBuilder по сравнению с первым решением состоит в том, что вызов setState() в listen() всегда перестраивает всю страницу, тогда как StreamBuilder будет вызывать только свой builder
  • Переменная snapShot содержит самые последние данные, полученные из Stream. Всегда проверяйте, что она содержит действительные данные, прежде чем ее использовать
  • Исходя из принципов инициализации во время, StreamBuilder не может получить значение во время самого первого кадра. Чтобы обойти это, мы передаем значение для initialData, которое используется для первой сборки, то есть для первого кадра экрана. Если мы не передадим initialData, наш билдер в первый раз будет вызван с недопустимыми данными. Альтернативой использованию initialData является возврат виджета- плейсхолдера, если snapShot невалиден, который отображается до тех пор, пока мы не получим действительные данные, например:


    // Предположим, что наш поток базируется на некотором обновлении в базе данных
    StreamBuilder<int>(
    stream: MyApp.model.databaseUpdates,
    builder: (context, snappShot) {
    
    if (snappShot != null && snappShot.hasData) {
        return Text(
          snappShot.data.toString(),
          style: Theme.of(context).textTheme.display1,
        );
    }
    
    // Пока мы не получим действительные данные, мы крутим Spinner
    return CircularProgressIndicator ();
    })


В следующем посте мы рассмотрим, как преобразовать данные в наших потоках и сделать это на лету. Большое спасибо Scott Stoll за чтение корректуры и важные отзывы.



Теги:
Хабы:
+5
Комментарии 6
Комментарии Комментарии 6

Публикации

Истории

Работа

Ближайшие события

Московский туристический хакатон
Дата 23 марта – 7 апреля
Место
Москва Онлайн
Геймтон «DatsEdenSpace» от DatsTeam
Дата 5 – 6 апреля
Время 17:00 – 20:00
Место
Онлайн
PG Bootcamp 2024
Дата 16 апреля
Время 09:30 – 21:00
Место
Минск Онлайн
EvaConf 2024
Дата 16 апреля
Время 11:00 – 16:00
Место
Москва Онлайн