Как стать автором
Обновить

Изучаем мультикаст операторы RxJS

Время на прочтение11 мин
Количество просмотров21K
Привет, Хабр! Представляю вашему вниманию перевод статьи «Understanding RxJS Multicast Operators» автора Netanel Basal.

Широковещательные или мультикаст операторы нередко кажутся самой сложной темой при изучении RxJS. В этой статье я попробую все доступно объяснить.

Мы рассмотрим внутреннее устройство мультикаст операторов и решаемые ими задачи.

Давайте начнем с описания основных строительных блоков RxJS.

Observable


В RxJS наблюдаемые объекты (далее «потоки») изначально являются холодными. Это значит, что каждый раз при подписке на поток выполняется коллбэк подписки.

Для лучшего понимания создадим такую реализацию:

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }
}

Конструктор Observable принимает единственный параметр — коллбэк подписки
subscriptionFn. Он будет вызываться каждый раз, когда мы подписываемся на поток (subscribe()).

Иногда еще коллбэк подписки называют producer, так как она так же «производит» значения для подписчика (объект observer в нашем коде).

Метод subscribe() принимает на вход observer. Это объект с тремя собственными методами: next(), error(), complete(). В живом RxJS вместо объекта можно передать три функции.

Метод subscribe() при вызове вызывает функцию подписки передавая ей на вход observer.

Мы сейчас не упомянули метод unsubscribe, но следует иметь в виду, что каждая подписка предоставить способ ее уничтожения. Чаще всего подписка возвращает функцию (или объект с соответствующим методом), при выполнении которой уничтожается связь между потоком и его подписчиками.

Это все довольно просто. Давайте теперь перейдем ближе к реальности. Например завернем в поток нативный XHR API


function http(url) {
  // This function will be called when we call http().subscribe()
  const subscriptionFn = observer => {
    log('Observable execution: http');
    const xhr = new XMLHttpRequest();
    xhr.addEventListener('load', () => {
      if (xhr.readyState === 4 && xhr.status === 200) {
        observer.next(JSON.parse(xhr.responseText));
        observer.complete();
      }
    });
    xhr.open('GET', url);
    xhr.send();
    
    return () => xhr.abort()
  }
  
  return new Observable(subscriptionFn);
}

Мы написали функцию http, которая получает URL, выполняет http-запрос и возвращает поток, который эмитит полученный http-ответ.

Теперь, глядя на нашу реализацию, как вы думаете, что произойдет когда мы подпишемся на этот поток дважды?


// A small observer helper
const observer = tag => ({
  next(value) {
    console.log(`${tag}:`, value);
  }
});

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-1'));

http('https://jsonplaceholder.typicode.com/users')
  .subscribe(observer('subscriber-2'));

Правильно, выполнится два http-запроса. Если еще раз взглянуть на реализацию класса Observable, мы увидим почему так. Каждый подписчик вызывает коллбэк подписки, который в свою очередь каждый раз выполняет http-запрос.



Операторы


Оператор это функция, которая принимает на вход поток, производит какие-либо действия, и возвращает поток.

Напишем наш первый собственный оператор.
function map(fn) {
  return source => {
    return new Observable(observer => {
      log('Observable execution: map');
      return source.subscribe({
        next(value) {
          observer.next(fn(value));
        }
      });
    });
  };
}

Функция map() возвращает оператор, который принимает исходный поток и возвращает поток, в котором все проходящие значения будут пропущены через функцию fn.

Т.е. внутри него обязательно присутствует подписка на входной поток.

Перед тем как использовать этот новый оператор, нам нужно как-то прикрепить его к потоку. Расширим наш класс Observable методом pipe()

class Observable {
  constructor(subscriptionFn) {
    this.subscriptionFn = subscriptionFn;
  }

  subscribe(observer) {
    return this.subscriptionFn(observer);
  }

  pipe(...operators) {
    return operators.reduce((source, next) => next(source), this);
  }
}

Простой метод, всего одна строка кода. pipe принимает массив операторов и вызывает их по очереди, передавая каждому на вход результат выполнения предыдущего.

Давайте же используем наш оператор:

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(observer('subscriber'));

При вызове subscribe будет выполнена подписка на выходной поток из map(), и в свою очередь внутри map будет выполнена подписка на исходный поток.

http-поток эмитит значение, оно попадает в map. Затем, выполняется функция fn, поток из map эмитит значение в конечную подписку. Так работает observable chain — цепочка потоков.



Если мы подпишемся на цепочку дважды, каждая подписка в цепочке вызовется два раза.

const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
    .pipe(map(res => res[0]));

firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));



А если нам не подходит такое поведение? Если мы хотим вызвать функцию подписки только один раз, сколько бы у нас не было подписок?

Например, что если мы хотим сделать один http-запрос и использовать результат всеми подписчиками? На этот случай нужен Subject.

Subjects


Subject это одновременно и поток и подписчик. Поток — потому что у него есть метод subscribe(), подписчик — потому что реализует интерфейс подписчика — методы next(), error(), complete().

Давайте напишем его.

class Subject extends Observable {
  constructor() {
    super();
    this.observers = [];
  }

  subscribe(observer) {
    this.observers.push(observer);
  }

  next(value) {
    this.observers.forEach(observer => observer.next(value));
  }

  error(error) {
    this.observers.forEach(observer => observer.error(error));
  }

  complete() {
    this.observers.forEach(observer => observer.complete());
  }
}

Subject может выполнять роль промежуточного звена между холодным потоком и множеством подписчиков.

Изменим наш пример следующим образом:

const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

http('https://jsonplaceholder.typicode.com/users')
  .pipe(map(res => res[0]))
  .subscribe(subject);

При вызове subject.subscribe(someFn) выполняется лишь одна простая операция — добавление в массив subject.observers функции someFn.

Ну а затем, поскольку Subject ведет себя и как подписчик тоже, можно подписать его на исходный поток, т.е. когда исходный поток эмитит значение, вызывается subject.next(), что влечет за собой передачу этого значения каждому из подписчиков subject.

Теперь у нас оригинальный коллбэк подписки выполняется один раз, и только один http-запрос будет выполнен.



Подписчики, опоздавшие на вечеринку


Что произойдет, если исходный поток уже сработал до того, как мы подписались?

Не получится показать это на прошлом примере, так как http — асинхронный, даже если подписаться на него сразу после, значение все равно придет уже после подписки.

Давайте быстро создадим порождающую функцию of:


function of(...values) {
  return new Observable(observer => {
    log('Observable execution: of');
    values.forEach(value => observer.next(value));
  });
}

Поток, созданный посредством of(), эмитит значения синхронно, один за другим. Подпишемся на subject уже после того, как тот подписался на of.

const subject = new Subject();
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

Наши подписчики не получили ничего. Почему? Наша реализация не поддерживает «опоздавших» подписчиков. Когда исходный поток от of() эмитит значения, подписчики еще не зарегистрированы, эти значения уйдут в никуда.

В актуальных примерах на Angular вполне может быть такое, что исходный поток сработал, но ваш компонент еще не присутствует на странице. И когда компонент появится, он подпишется на источник, но не получит уже пролетевших значений.

Один из путей решения проблемы это ReplaySubject. Набросаем его вариант и посмотрим как это работает.


class ReplaySubject extends Subject {
  constructor(bufferSize) {
    super();
    this.observers = [];
    this.bufferSize = bufferSize;
    this.buffer = [];
  }

  subscribe(observer) {
    this.buffer.forEach(val => observer.next(val));
    this.observers.push(observer);
  }

  next(value) {
    if (this.buffer.length === this.bufferSize) {
      this.buffer.shift();
    }

    this.buffer.push(value);
    this.observers.forEach(observer => observer.next(value));
  }
}

Концепция проста. Как и следует из названия, ReplaySubject это особый Subject, который может воспроизводить старые значения всем новым подписчикам.

Каждое выпущенное значение будет передано всем текущим подписчикам и сохранено для будущих, размер буфера bufferSize устанавливается в конструкторе.

Перепишем прошлый пример с помощью ReplaySubject.

const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

Результат изменился.

Несмотря на позднюю подписку мы поймали их всех.



Резюмируя, предназначение ReplaySubject — это рассылка значений всем подписчикам и кэширование их для будущих «опоздавших» подписчиков.

Перед тем, как идти дальше я рекомендую вам попробовать написать свою реализацию BehaviorSubject. Готовый код вы сможете найти в конце статьи.

Теперь мы наконец переходим к мультикаст операторам. Надеюсь рассмотренные выше примеры выше вам помогут быстрее их понять.

Multicast Operators


Multicast и Connect


Оператор multicast() использует Subject, чтобы эмитить исходный поток нескольким подписчикам.


import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new Subject())
)

const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);

const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
  .connect();

multicast возвращает объект ConnectableObservable, у которого есть метод connect. Его назначение — подписать полученный subject на исходный поток.

Метод connect позволяет нам самим определять, когда запустить на выполнение исходный поток. Тут есть момент, о котором нужно помнить — чтобы отписаться от источника нужно выполнить:

connectableSubscription.unsubscribe();

Мы не ограничены простым Subject. Вместо него можно использовать любой производный класс, например ReplaySubject:

import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
  multicast(new ReplaySubject(1))
)

const observer1 = connectableObservable.subscribe(log);

setTimeout(() => {
  // Late subscriber
  connectableObservable.subscribe(log);
}, 3000)

const connectable = (connectableObservable as ConnectableObservable<any>).connect();

По этому коду можно догадаться, что произойдет под капотом.

Когда мы используем multicast, мы можем передать не только Subject, но и фабричную функцию, которая каждый раз возвращает новый Subject.

Переиспользовать уже завершенный subject нельзя, фабричная функция решает эту проблему.

interval(1000).pipe(
  multicast(() => new Subject())
)

RefCount


Когда мы используем оператор multicast(), мы сами отвечаем за вызов connect() для начала исполнения исходного observable. Плюс к тому мы еще должны следить за возможными утечками памяти, вручную отписываясь от ConnectableSubscription.

Автоматизация процесса позволила бы избежать ошибок и упростила код. Добрые разработчики RxJS подумали об этом за нас и создали refCount оператор.

refCount считает подписки и при появлении первой, он вызывает connect(), т.е. подписывается. Когда оно обратно уменьшиться до нуля, будет вызвана отписка.

const source = interval(1000).pipe(
  multicast(new Subject()),
  refCount()
)
 
// refCount === 1 => source.subscribe();
const observer1 = source.subscribe(log);

// refCount === 2
const observer2 = source.subscribe(log);

setTimeout(() => {
  // refCount - 1
  observer1.unsubscribe();
  // refCount - 1
  observer2.unsubscribe();
  // refCount === 0 => source.unsubcribe();
}, 3000)

Заметим, что после refCount мы получаем обычный observable, а не ConnectableObservable.

Publish и его варианты


multicast() + Subject + refCount() это довольно типичный случай в RxJS и разработчики сократили его до одного оператора.

Посмотрим какие у нас есть варианты.

  • publish() равнозначен multicast(() => new Subject())
    const connectableObservable = interval(1000).pipe(
      publish()
    )
    
    connectableObservable.connect();
    

  • publishBehavior() равнозначен multicast(new BehaviorSubject())
    const connectableObservable = interval(1000).pipe(
      publishBehavior(100)
    )
    
    connectableObservable.connect();
    

  • publishReplay() равнозначен multicast(() => new ReplaySubject(x))
    const connectableObservable = interval(1000).pipe(
      publishReplay(3)
    )
    
    connectableObservable.connect();
    

  • publishLast() равнозначен multicast(new AsyncSubject())
    const connectableObservable = interval(1000).pipe(
      take(2),
      publishLast()
    )
    
    connectableObservable.connect();
    

  • share() равнозначен multicast(() => new Subject()) + refCount()
    const source = interval(1000).pipe(
      share()
    )
    

  • shareReplay(bufferSize) это мультикаст оператор, который использует ReplaySubject. У него внутри нет multicast() и его результат это observable, а не ConnectableObservable. Он может быть использован как с refCount, так и без него. Вот оба варианта:

    interval(1000).pipe(
      shareReplay({ refCount: true, bufferSize: 1 })
    )
    
    interval(1000).pipe(
      shareReplay(1)
    )
    

Когда shareReplay вызван с { refCount: false } все равно что вызвать shareReplay(x).

В этом случае подсчета ссылок не будет. Это значит что пока исходный поток не будет завершен, shareReplay будет подписан на него, независимо есть у него самого конечные подписчики или нет. Все новые подписчики получат последние x значений.

shareReplay vs publishReplay + refCount


На первый взгляд shareReplay({ refCount: true, bufferSize: X }) идентичен publishReplay(X) + refCount() , однако это не совсем верно.

Посмотрим в чем сходство и в чем разница.

У них одинаковое поведение refCount — подписка и отписка от исходного потока на основании количества подписчиков. Еще у них одинаково реагируют когда исходный поток завершился — все новые подписчики получает X последних значений.

Однако, если же исходный поток еще не завешен, в этом случае когда у нас publishReplay(X) + refCount() — все новые подписчики получают X значений из буфера, а потом будут подписаны заново с помощью того же ReplaySubject.
А вот в случае если мы используем shareReplay({ refCount: true, bufferSize: 1 }) последних X значений они не получат, так как внутри оно создает новый ReplaySubject и использует его для переподписки на источник.

Примеры, иллюстрирующие это:

const source = interval(1000).pipe(
  publishReplay(1),
  refCount()
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
 
  // This subscriber will get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);

const source = interval(1000).pipe(
  shareReplay({ refCount: true, bufferSize: 1 })
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
  one.unsubscribe();
  
  // This subscriber will NOT get the last emitted values from the source
  const two = source.subscribe(observer('subcriber-2'));
}, 3000);





Актуальные примеры в Angular


Посмотрим как применять в боевых условиях изученные мультикаст операторы.

Используем share


Пусть у нас есть компонент, которому нужны данные из исходного потока. Это может быть запрос http, стейт или что-угодно. А еще нам нужно манипулирование данными, типа фильтрации, сортировки и т.п.

@Component({
  template: `
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;

  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
  }
}

А теперь нам нужен другой компонент, который показывает только первого пользователя. Если мы подпишемся на исходных поток как он есть, то:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

И вот у нас два http-запроса, операции сортировки или фильтрации выполнятся дважды.
Применяем share:

@Component({
  template: `
    <user [user]="firstUser$ | async"></user>
    <users-list [users]="allUsers$ | async"></users-list>
  `,
})
export class UsersPageComponent {
  allUsers$: Observable<User[]>;
  firstUser$: Observable<User>;
  
  constructor(private http: HttpClient) {
  }

  ngOnInit() {
    this.allUsers$ = this.http.get('https://api/users').pipe(
      map(users => filter/sort),
      share()
    );
    
    this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
  }
}

Мы уже знаем, что он создает новый Subject, который подписывается на источник. Когда источник эмитит, subject передает это значение всем своим подписчикам.

Проблема решена, и когда мы подписались на firstUser$ — мы подписались на внутренний subject, а не на исходный поток напрямую.

Используем ShareReplay


ShareReplay применяется когда нужно эмитить, кэшировать и повторять последние X значений. Типичный пример — синглтон сервис, выполняющий http запрос.


@Injectable({ providedIn: 'root' })
export class BlogService {
  posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
              .pipe(shareReplay(1));

  constructor(private http: HttpClient) {}
}

Тут уже неважно сколько компонентов сейчас или в будущем запросят данные, http запрос будет только один и результат будет сохранен во внутреннем буфере ReplaySubject.
Еще может встретиться случай, где нужно отменить незавершенный запрос, ведь подписчиков уже нет, тогда нужно будет применить refCount.

Полный код вы можете найти здесь.
Теги:
Хабы:
Всего голосов 7: ↑7 и ↓0+7
Комментарии0

Публикации

Истории

Работа

Ближайшие события

28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
2 – 18 декабря
Yandex DataLens Festival 2024
МоскваОнлайн
11 – 13 декабря
Международная конференция по AI/ML «AI Journey»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань