Основы реактивного программирования с использованием RxJS. Часть 3. Higher Order Observables

  • Tutorial


В данной статье мы рассмотрим, как можно в одном потоке обрабатывать другой, для чего это нужно, и как нам в этом помогут операторы Higher Order Observables (дальше HOO).

При работе с потоками часто возникает ситуация, когда необходимо в качестве значения одному потоку передать результаты работы другого. Например, мы хотим выполнить ajax запрос и его ответ обработать в текущем потоке, или запустить несколько параллельных запросов, реализовать pooling. Думаю, многие привыкли решать подобные задачи, используя такой механизм как promise. Но можно ли решать их используя RxJS? Конечно, и все куда проще чем вы думаете!

Серия статей “Основы реактивного программирования с использованием RxJS”:


Примечание: для того, чтобы понять теоретическую часть статьи, вам не обязательно читать предыдущие статьи, необходимо лишь знать, что такое observable, операторы и пайпы. В практической части мы будем дорабатывать пример из второй статьи, который вы можете найти по ссылке.

Проблема


Давайте представим следующую задачу: нам необходимо каждую секунду узнавать, доступен ли сервер. Как мы можем ее решить?

Для начала создадим поток с помощью метода timer:

timer(0, 1000).subscribe({
  next: console.log
});

Метод timer очень похож по принципу работы на interval. Но в отличие от него позволяет задать таймаут запуска потока, который передается первым параметром. Вторым параметром указывается интервал, через который будет генерироваться новое значение. Если второй параметр не указывать, то таймер сгенерирует только одно значение и завершит поток.

Так как сервера у нас с вами нет, то предлагаю просто написать функцию, эмулирующую запрос, на сервер:

const makeRequest = () => {
  return timer(1000).pipe(
    mapTo('success')
  )
}

Что делает этот метод? Он возвращает поток, созданный с помощью метода timer, который испускает значение по прошествию одной секунды и завершается. Так как метод timer генерирует только число, то с помощью оператора mapTo мы заменяем его на строку “success”.

Вот как выглядит поток, который создается методом makeRequest:



Теперь у нас стоит выбор: вызывать метод makeRequest внутри потока или возложить эту обязанность на наблюдателя?

Первый подход более предпочтительный, так как в этом случае мы сможем использовать весь потенциал RxJS с его операторами и избавим нашего наблюдателя от лишних обязанностей. Воспользуемся методом timer, чтобы выполнять запросы по интервалу:

timer(0, 1000).pipe(
  map(() => makeRequest())
).subscribe({
  next: console.log
});

Когда мы запустим подобный код, то увидим, что в console.log нам приходит не сообщение с текстом “success”, а объект типа Observable:



Ответ вполне ожидаем, ведь в map мы возвращаем поток. Чтобы поток заработал, нужно на него подписаться. Что же, давайте посмотрим, как не надо делать:

timer(0, 1000).pipe(
  map(() => makeRequest())
).subscribe({
  next: observable => observable.subscribe({
    next: console.log
  });
});

Проблема примера выше в том, что мы получаем подписку в подписке. А что если мы захотим сделать более одного запроса в цепочке? Или как быть, если в определенный момент нам нужно будет отписаться от потока внутри? В таком случае наш код все больше будет напоминать “лапшу”. Для решения данной проблемы в RxJS есть специальные операторы, которые носят название HOO.

HOO


HOO — это особый тип операторов, которые в качестве значений принимают потоки. Одним из таких операторов является метод mergeAll.

Когда на вход mergeAll поступает поток, он подписывается на него. Поток, на который оператор подписался, носит название внутренний. Поток, из которого оператор получает в качестве значений другие потоки, называют внешним.

Когда внутренний поток генерирует значение, то mergeAll проталкивает это значение во внешний поток. Таким образом, мы избавляемся от необходимости выполнять подписку вручную. Если мы отпишемся от внешнего потока, то mergeAll автоматически отпишется от внутреннего.

Давайте посмотрим, как можно переписать наш пример с помощью mergeAll:

timer(0, 1000).pipe(
  map(() => makeRequest())
  mergeAll()
).subscribe({
  next: console.log
});

В примере выше внешний поток создан оператором timer. А потоки, которые создаются в операторе map — внутренние. Каждый созданный поток попадает в оператор mergeAll.



Комбинация map + mergeAll используются очень часто, поэтому в RxJS существует метод mergeMap:

timer(0, 1000).pipe(
  mergeMap(() => makeRequest())
).subscribe({
  next: console.log
});

Когда внешний поток генерирует значение, оператор mergeMap вызывает переданную в него функцию обратного вызова (callback), которая генерирует новый поток. Затем mergeMap подписывается на сгенерированный поток.



Особенность mergeAll/mergeMap оператора в том, что, если к нему спускается еще один поток, то он так же подписывается на него. Таким образом, во внешний поток у нас могут попадать значения сразу из нескольких внутренних. Посмотрим следующий пример:

  timer(0, 1000)

Вот так внешний поток будет выглядеть без оператора mergeMap:



А вот так с mergeMap:

timer(0, 1000).pipe(
  mergeMap(() => interval(1000)) 
)



Каждую секунду у нас создается новый внутренний поток и mergeMap на него подписывается. Таким образом, у нас одновременно работают множество внутренних потоков, значения из которых попадают во внешний:





Примечание: будьте аккуратны с применением mergeMap, каждый новый внутренний поток будет работать до тех пор, пока вы не отпишитесь от внешнего. В примере выше количество внутренних потоков растет каждую секунду, в конце концов потоков может стать настолько много, что компьютер не справится с нагрузкой.

concatAll/concatMap


Метод mergeMap отлично подходит, когда вам не важен порядок исполнения внутренних потоков, но как быть, если у вас появилась в этом необходимость? Допустим, мы хотим, чтобы следующий запрос на сервер выполнился только после получения ответа от предыдущего?

Для подобных целей подойдет HOO оператор concatAll/concatMap. Данный оператор, подписавшись на внутренний поток, ждет, пока тот не завершится, и только потом подписывается на следующий.

Если во время выполнения одного потока к нему спускается новый, то он помещается в очередь до тех пор, пока предыдущий не завершится.

// поток, генерирующий 1 по прошествии одной секунды
const firstInnerObservable = timer(1000).pipe(
  mapTo(1)
);

// поток, генерирующий 2 по прошествии половины секунды
const secondInnerObservable = timer(500).pipe(
  mapTo(2)
);

of(
  firstInnerObservable,
  secondInnerObservable
).pipe(
  concatAll()
).subscribe({
  next: console.log
});

В примере выше мы создаем два потока с помощью метода timer. Для наглядности я использовал оператор mapTo для вывода разных значений. Первый поток сгенерирует 1, второй — 2. Внешний поток создается с помощью метода of, который на вход принимает два вышеобъявленных observable.

Оператор concatAll сначала получает firstInnerObservable, подписывается на него и ждет, когда он завершится, и только после завершения первого подпишется на secondInnerObservable. Вот как будет выглядеть внешний поток:



Если же мы заменим concatAll на mergeAll, то поток будет выглядеть так:

of(
  firstInnerObservable,
  secondInnerObservable
).pipe(
  mergeAll()
).subscribe({
  next: console.log
});



switchAll/switchMap


Данный оператор отличается от предыдущих тем, что когда он получает новый поток, то сразу отписывается от предыдущего и подписывается на новый.

Возьмем пример выше и заменим concatAll на switchAll, и посмотрим как будет вести себя внешний поток:

of(
  firstInnerObservable,
  secondInnerObservable
).pipe(
  switchAll()
).subscribe({
  next: console.log
});



Во внешний поток попало только значение из второго внутреннего потока. Все потому что switchMap отписался от первого, когда получил второй поток.

Когда это бывает нужно? Например, при реализации поиска данных. Если ответ с сервера еще не пришел, а мы уже отправили новый запрос, то нам нет смысла ждать предыдущий.

exhaust/exhaustMap


exhaust — полная противоположность оператору switchAll, при этом его поведение похоже на concatAll. Данный метод, подписавшись на поток, ждет, когда он завершится. Если к нему спускается новый поток, то он просто отбрасывается.

of(
  firstInnerObservable,
  secondInnerObservable
).pipe(
  exhaust()
).subscribe({
  next: console.log
});



В примере выше мы не получили двойку, потому что в этот момент оператор ожидал завершения первого потока, и просто отбросил второй.

Думаю, у многих возник вопрос, когда подобное поведение может понадобиться? Хороший пример — форма авторизации. Нет смысла отправлять несколько запросов на сервер, пока не выполнился текущий.

Дорабатываем приложение


Вспоминаем пример из второй статьи. В нем мы реализовали поиск по GitHub и использовали оператор mergeMap для отправки запросов к серверу. Теперь мы знаем особенности данного оператора, действительно ли он подходит в нашем случае?

fromEvent(input, 'keyup').pipe(
  debounceTime(700),
  map(event => event.target.value),
  filter(val => val.length > 2),
  distinctUntilChanged(),
  mergeMap(value => {
    return from(getUsersRepsFromAPI(value)).pipe(
      catchError(err => of([]))
    )
  })
).subscribe({
  next: reps => recordRepsToList(reps)
})

Давайте предположим, что сервер GitHub будет сильно перегружен, тогда обработка нашего ответа будет занимать много времени. Что в этом случае может пойти не так?

Допустим, пользователь ввел какие-то данные, не дождался ответа и ввел новые. В этом случае мы отправим уже второй запрос на сервер. При этом никто не гарантирует, что ответ на первый запрос придет раньше.

Так как оператору mergeMap все равно в какой последовательности обрабатывать внутренние потоки, то в случае, когда первый запрос выполнится позже второго, мы затрем актуальные данные. Поэтому предлагаю заменить метод mergeMap на switchMap:

fromEvent(input, 'keyup').pipe(
  debounceTime(700),
  map(event => event.target.value),
  filter(val => val.length > 2),
  distinctUntilChanged(),
  switchMap(value => {
    return from(getUsersRepsFromAPI(value)).pipe(
      catchError(err => of([]))
    )
  })
).subscribe({
  next: reps => recordRepsToList(reps)
})

Теперь, если пользователь введет новые данные, то switchMap отпишется от предыдущего потока и подпишется на новый.

Стоит отметить, что наш http запрос будет продолжать висеть до тех пор, пока сервер не даст на него ответ. Но, так как мы отписались от внутреннего потока, ответ не попадет во внешний поток.

Примечание: если вы работаете с Angular и применяете HttpClient для работы с http, то можете не беспокоится об отмене самого запроса. HttpClient умеет это делать за вас при отписке.

Отменяем http


В fetch api есть возможность отмены http запроса с применением AbortController. Данный функционал при комбинировании с оператором switchMap позволит экономить трафик пользователя.

Давайте перепишем немного наш пример. И создадим метод, который будет оборачивать вызов fetch в observable:

const createCancellableRequest = (url) => {
  // создаем контроллер для возможности отмены
  const controller = new AbortController();
  const signal = controller.signal;

  return new Observable(observer => {

    fetch(url, { signal })
      .then(response => {
        if (response.ok) {
          return response.json();
        }

        throw new Error('Ошибка');
    })
    // передаем успешный ответ наблюдателю
    .then(result => observer.next(result))
    // завершаем поток
    .then(() => observer.complete())
    // в случае ошибки, оповещаем об этом наблюдателя
    .catch(error => observer.error(error));

    // функция, вызывающаяся при отписке
    return () => {
      // отменяем запрос
      controller.abort();
    };

  });
};

Также поменяем метод getUsersRepsFromApi:

const getUsersRepsFromAPI = (username) => {
  const url = `https://api.github.com/users/${ username }/repos`;
  return createCancellableRequest(url);
}

Теперь метод возвращает не promise, а observable. Поэтому уберем обертку from в switchMap:

switchMap(value => {
  return getUsersRepsFromAPI(value).pipe(
    catchError(err => of([])
  )
)

Примечание: в RxJS версии 6.5 добавили оператор fromFetch, который под капотом сам вызывает метод abort, поэтому вам больше не нужно писать свой “велосипед”.

Вот и все! Весь код примера можно найти здесь.

Заключение


Сегодня мы рассмотрели, что такое HOO и несколько очень полезных операторов из данной категории. Конечно, это были далеко не все из них. Для более детальной и подробной информации рекомендую посетить документацию по RxJS.

В следующей статье я планирую рассмотреть, в чем разница между Hot и Cold observables.

Напоследок: не используйте подписку в подписке, ведь есть HOO!
  • +13
  • 3,2k
  • 5
Поделиться публикацией

Комментарии 5

    +3

    Спасибо за статью. Хотя в ней разобран пример, который разбирается наверно в каждой второй статье про rxjs.


    Давайте добавим к инпуту с текстом еще пару кнопок: для отмены и повтора запроса. Отменять надо моментально, а повторять только когда завершился предыдущий. Пусть еще будет прогресс-бар, отображающий ход загрузки. Как бы вы реализовали?

      0
      Мой пример изначально был немного сложнее, с показом ошибки и выводом сообщения о том, что результат пустой. Но тогда статья очень сильно разрастается. Я же хотел максимально доступно объяснить как работают эти операторы, и немного показать это на практике.
      Но за идею спасибо! Думаю это можно будет показать в отдельной статье.
      +1

      Вы, кстати, забыли в примере с fetch вызвать complete.
      Ещё оставлю это для тех, кто хочет понять switchMap раз и навсегда https://youtu.be/rUZ9CjcaCEw

        0
        Действительно. Спасибо, поправил.
        0
        Простите, но что это за чушь? HOO? нет такого понятия, есть понятие функций высшего порядка. Так и тут это HOF и никакие не HOO!

        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

        Самое читаемое