Как стать автором
Обновить

Изучаем mergeMap, switchMap, concatMap и exhaustMap

Уровень сложностиСредний
Время на прочтение4 мин
Количество просмотров35K

Небольшая история

Я Angular разработчик. Это моя первая статья и таких я собираюсь написать много. Пришла эта идея мне в голову потому что иногда, пытаясь найти какую-то информацию в интернете о принципах работы какого-либо инструмента разработки, я не нахожу абсолютно ничего. Это либо ненавистные обожаемые мною доки, в которых написаны очень краткие принципы работы для знающих людей, либо stackoverflow, где кто-то норовит написать кучу слов ни о чём или без практической составляющей, либо просто статьи, не отражающие сути. Но иногда можно наткнуться на знающего человека, который за одну минуту объяснит тебе всю суть и от счастья хочется прыгать. Я решил писать обо всех таких моментах, которые мне очень сложно давались в понимании из-за отсутствия адекватной информации или моей криворукости. Я буду писать подобные статьи прежде всего для себя и если кому-то это поможет - я буду безумно рад, что какому-нибудь камраду не пришлось долго мучиться и понимать смысл того или иного инструмента для разработки. Статьи будут максимально краткими и по делу. Погнали!


Общее

1) У нас есть метод в сервисе нашего Angular приложения

export interface RxjsExample {
    id: number;
    name: string;
}

getRxJxExampleById(id: number): Observable<RxjsExample> {
    return this.httpClient.get<RxjsExample>(`${this.basePath}/users/${id}`)
        .pipe(
            catchError(err => {
                console.log(err)
                return EMPTY;
            })
        );
}

Тут всё элементарно: происходит запрос на backend и, в зависимости от id, мы получаем разные ответы в виде

{
  id: number;
  name: string;
}

2) Есть backend нашего приложения

mergeConcatSwitchExhaustedRoutes.get('/users/:id', async (req: Request, res: Response): Promise<void> => {
    const id = req.params.id;

    if (id === '1') {
        await wait(3000);
        res.status(200).send({
            id: 1,
            name: 'Первый'
        });
    }
    if (id === '2') {
        await wait(1000);
        res.status(200).send({
            id: 2,
            name: 'Второй'
        });
    }
    if (id === '3') {
        await wait(5000);
        res.status(200).send({
            id: 3,
            name: 'Третий'
        });
    }
    if (id === '4') {
        await wait(2000);
        res.status(200).send({
            id: 4,
            name: 'Четвёртый'
        });
    }
});

const wait = (ms: number): Promise<void > => {
    return new Promise(resolve => setTimeout(resolve, ms));
}

Просто роут на бэке, тут в зависимости от id мы возвращаем ответ с определённым ожиданием. Это сделано специально для понимания операторов, описываемых ниже


switchMap

(Злюка)

Начнём с самого простого и понятного оператора switchMap

switchMapExample(): void {
    from([1, 2, 3, 4])
        .pipe(
            switchMap((id) => this.rxjsExampleService.getRxJxExampleById(id)),
            tap((result) => console.log(result))
        )
        .subscribe();
}

Итак, мы эмитим четыре значения подряд, это наши idшки, по ним мы делаем запрос на бэк, как и было сказано выше

Оператор switchMap, получая новое значение, подписывается на новый поток (Observable), тут же отписавшись от предыдущего

Алгоритм:

  • Эмит 1 -> пошёл запрос на сервер для id = 1

  • Эмит 2 -> отписались от выполнения 1, подписались на id = 2

  • Эмит 3 -> отписались от выполнения 2, подписались на id = 3

  • Эмит 4 -> отписались от выполнения 3, подписались на id = 4

  • Дождались ответа от сервера по id = 4

// Ответ
{
    "id": 4,
    "name": "Четвёртый"
}

mergeMap

(Добряш)

mergeMapExample(): void {
    from([1, 2, 3, 4])
        .pipe(
            mergeMap((id) => this.rxjsExampleService.getRxJxExampleById(id)),
            tap((result) => console.log(result))
        )
        .subscribe();
}

Оператор mergeMap, получив следующий эмит, подпишется на новый поток (Observable), не отписываясь от предыдущего. Каждый из них выполниться в рандомном порядке. То есть, в данном случае, мы получим 4 независимых потока, которые выполнятся в любом порядке, независимо от времени ответа от сервера

Алгоритм:

  • Эмит 1 -> ждём ответа от id = 1 (3 секунды) (параллельно)

  • Эмит 2 -> ждём ответа от id = 2 (1 секунда) (параллельно)

  • Эмит 3 -> ждём ответа от id = 3 (5 секунд) (параллельно)

  • Эмит 4 -> ждём ответа от id = 4 (2 секунды) (параллельно)

Все запросы идут параллельно, никто никого не ждёт

// Первый ответ
{
    "id": 2,
    "name": "Второй"
}
// Второй ответ
{
    "id": 4,
    "name": "Четвёртый"
}
// Третий ответ
{
    "id": 1,
    "name": "Первый"
}
// Четвёртый ответ
{
    "id": 3,
    "name": "Третий"
}

Дополнительная информация:

mergeMap может принять второй параметр concurrent, в коде ниже это число 2

mergeMapExample(): void {
    from([1, 2, 3, 4])
        .pipe(
            mergeMap((id) => this.rxjsExampleService.getRxJxExampleById(id), 2),
            tap((result) => console.log(result))
        )
        .subscribe();
}

concurrent ограничивает количество параллельно выполняющихся потоков. В данной ситуации, если у нас есть два сэмиченных выполняющихся значения, то mergeMap будет ждать выполнения хотя бы одного из них, чтобы начать выполнять следующий, третий.

Аналогия их жизни: огромная очередь на кассу в магазине, а касс 2 штуки, соотвественно, они параллельно могут обслужить только двух клиентов и, когда одна из них освободится - следующий клиент подойдёт к освободившейся быстрее всего кассе.

А знаете как будет работать mergeMap, если concurrent будет равен 1? Как следующий оператор:)


concatMap

(Перфекционист)

concatMapExample(): void {
    from([1, 2, 3, 4])
        .pipe(
            concatMap((id) => this.rxjsExampleService.getRxJxExampleById(id)),
            tap((result) => console.log(result))
        )
        .subscribe();
}

Оператор concatMap, получив следующий эмит, подпишется на него, не отписываясь от предыдущего и выполнит их в строго заданном порядке, независимо от того как долго он будет получать ответ от сервера по каждому id

Алгоритм

  • Эмит 1 -> ждём ответа от id = 1 (3 секунды) пока не выполнится

  • Эмит 2 -> ждём ответа от id = 2 (1 секунда) пока не выполнится

  • Эмит 3 -> ждём ответа от id = 3 (5 секунд) пока не выполнится

  • Эмит 4 -> ждём ответа от id = 4 (2 секунды) пока не выполнится

// Первый ответ
{
    "id": 1,
    "name": "Первый"
}
// Второй ответ
{
    "id": 2,
    "name": "Второй"
}
// Третий ответ
{
    "id": 3,
    "name": "Третий"
}
// Четвёртый ответ
{
    "id": 4,
    "name": "Четвёртый"
}

exhaustMap

(У меня есть задача, отвалите)

exhaustMapExample(): void {
    from([1, 2, 3, 4])
        .pipe(
            exhaustMap((id) => this.rxjsExampleService.getRxJxExampleById(id)),
            tap((result) => console.log(result))
        )
        .subscribe();
}

Оператор exhaustMap, получив первый эмит, забивает на все последующие эмиты, пока не выполнит запрос по первому. То есть idшки 2, 3 и 4 просто пройдут мимо и никогда не выполнятся

Алгоритм:

  • Эмит 1 -> ждём ответа от id = 1 (3 секунды) пока не выполнится

  • Эмит 2 -> не выполняется, ещё первый не выполнился, пролетел

  • Эмит 3 -> не выполняется, ещё первый не выполнился, пролетел

  • Эмит 4 -> не выполняется, ещё первый не выполнился, пролетел

// Первый и единственный ответ
{
    "id": 1,
    "name": "Первый"
}

Для тех, кто хочет сам потыкать на кнопки и посмотреть как работают операторы вот проекты на github

Фронт часть

Бэк часть

Это всё. Надеюсь, статья была для вас полезной.

Спасибо :)

Теги:
Хабы:
Всего голосов 12: ↑10 и ↓2+11
Комментарии7

Публикации

Истории

Работа

Ближайшие события

7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн
15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань