Самое краткое введение в Reactive Programming

    Цель данной статьи – показать на примере зачем нужно reactive programming, как оно связано с функциональным программированием, и как с его помощью можно писать декларативный код, который легко адаптировать к новым требованиям. Кроме того, хочется сделать это максимально кратко и просто на примере приближенном к реальному.


    Возьмем такую задачу:
    Есть некий сервис c REST API и endpointом /people. При POST-запросе на этот endpoint'a создается новая сущность. Написать функцию которая принимает массив объектов вида { name: 'Max' } и создают набор сущностей посредством API(по-английски, это называется batch-операция).


    Давайте решим эту задачу в императивном стиле:


    const request = require('superagent')
    
    function batchCreate(bodies) {
      const calls = []
      for (let body of bodies) {
        calls.push(
          request
            .post('/people')
            .send(body)
            .then(r => r.status)
        )
      }
      return Promise.all(calls)
    }
    

    Давайте, для сравнения, перепишем этот кусочек кода в функциональном стиле. Для простоты, под функциональным стилем мы будем понимать:


    1. Применение функциональных примитивов(.map, .filter, .reduce) вместо императивных циклов(for, while)
    2. Код организован в "чистые" функции – они зависят только от своих аргументов и не зависят от состояния системы

    Код в функциональном стиле:


    const request = require('superagent')
    
    function batchCreate(bodies) {
      const calls = bodies.map(body =>
        request
          .post('/people')
          .send(body)
          .then(r => r.status)
      )
      return Promise.all(calls)
    }

    Мы получили кусок кода такого же размера и стоит признаться что не понятно чем этот кусок лучше предыдущего.
    Для того чтобы понять чем второй кусок кода лучше – нужно начать менять код, представим что к оригинальной задаче появилось новое требование:
    У сервиса который мы вызываем появилось ограничение на количество запросов в промежуток времени: за секунду один клиент может выполнить не более пяти запросов. Выполнение большего количества запросов приведет к тому что сервис будет возвращать 429 HTTP ошибку(too many requests).


    В этом месте, пожалуй, стоит остановиться и попробовать решить задачу самому, %username%


    Возьмем за основу наш функциональный код и попробуем его изменить. Основная проблема "чистого" функционального программирования состоит в том, что оно ничего "не знает" — о среде выполнения и вводе-выводе(в английском для этого есть выражение side effects), но на практике мы постоянно с ними работаем.
    Чтобы заполнить этот пробел на помощь приходит Reactive Programming — набор подходов пытающихся решить проблему side effects. Самой известной реализацией этой парадигмы является библиотека Rx, использующая концепцию reactive streams


    Что такое reactive streams? Если очень кратко, то это подход позволяющий применить функциональные примитивы(.map, .filter, .reduce) к чему-то распределенному по времени.


    Например, мы передаем по сети некий набор комманд – нам не нужно дожидаться пока мы получим весь набор, мы представляем его как reactive stream и можем с ним работать. Тут возникают еще два важных концепта:


    • поток может быть бесконечным или как угодно долго распределенным по времени
    • передающая сторона передает команду только в том случае, если принимающая готова ее обработать(backpressure)

    Целью этой статьи является поиск легких путей, поэтому, мы возьмем библиотеку Highland, которая старается решить ту же задачу что и Rx, но намного проще в освоении. Идея лежащая внутри проста: давайте возьмем за основу Node.js streams и будем “переливать” данные из одного Stream в другой.


    Приступим: начнем с простого — сделаем наш код "реактивным" без добавления нового функционала


    const request = require('superagent')
    const H = require(‘highland’)
    
    function batchCreate(bodies) {
       return H(bodies)
        .flatMap(body =>
          H(request
            .post('localhost:3000/people')
            .send(body)
            .then(r => r.status)
          )
        )
        .collect()
        .toPromise(Promise)
    }

    На что стоит обратить внимание:


    • H(bodies) – мы создаем stream из массива
    • .flatMap и callback который он принимает. Идея довольно проста — мы заворачиваем Promise в конструктор потока чтобы получить поток с одним значением(или ошибкой. важно понимать что это именно значение, а не Promise).
      В результате это нам дает поток потоков — при помощи flatMap мы сглаживаем это в один поток значений которым мы можем оперировать(кто сказал монада?)
    • .collect – нам нужен для того чтобы собрать все значения в одной "точке" в массив
    • .toPromise – вернет нам Promise, который будет fulfilled в момент когда у нас будет значение из потока

    Теперь давайте попробуем реализовать наше требование:


    const request = require('superagent')
    const H = require('highland')
    
    function batchCreate(bodies) {
       return H(bodies)
        .flatMap(body =>
          H(request
            .post('localhost:3000/people')
            .send(body)
            .then(r => r.status)
          )
        )
        .ratelimit(5, 1000)
        .collect()
        .toPromise(Promise)
    }

    Благодаря концепту backpressure – это всего лишь одна строчка .ratelimit в данной парадигме. В Rx это занимает приблизительно столько же места.


    Ну вот и все, интересно ваше мнение:


    • получилось ли у меня достичь декларируемого в начале статьи результата?
    • можно ли достичь аналогичного результата используя императивный подход?
    • заинтересовались ли вы Reactive programming?

    P.S.: вот тут можно найти еще одну мою статью про Reactive Programming

    Средняя зарплата в IT

    120 000 ₽/мес.
    Средняя зарплата по всем IT-специализациям на основании 6 371 анкеты, за 1-ое пол. 2021 года Узнать свою зарплату
    Реклама
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее

    Комментарии 40

    • НЛО прилетело и опубликовало эту надпись здесь
        +2
        Реактивное программирование — парадигма программирования, ориентированная на потоки данных и распространение изменений.

        где здесь потоки данных и распространение изменений?
        Статья демонстрирует возможности метода ratelimit, такой метод можно и в какой-нибудь lodash запихать, работать будет ничуть не хуже.

          0
          а можно попросить вас показать пример метода, который вы «запихаете» в lodash? Это не комментарий в стиле – сами напишите статью. Просто попробуйте мысленно реализовать этот метод, и, мне кажется, вы поймете «где здесь потоки данных и распространение изменений?»
            0
            И прошу не забывать о том что request – асинхронный и мы не знаем сколько времени может занять выполненние запроса(может быть больше 1 секунды)
              0

              Набросал:


              <script>
              
              function LimitQueue(limit, timeout) {
                  this.limit = limit;
                  this.timeout = timeout;
              
                  this.jobs = [];
                  this.pending = 0;
              
                  this.results = [];
              
                  this._run = this._run.bind(this);
              }
              
              LimitQueue.prototype.push = function(jobs) {
                  Array.prototype.push.apply(this.jobs, jobs);
              };
              
              LimitQueue.prototype._run = function(results) {
                  if (results) {
                      this.pending = 0;
                      Array.prototype.push.apply(this.results, results);
              
                      setTimeout(this._run, this.timeout);
              
                      if (!this.jobs.length) {
                          this.callback(this.results);
                      }
              
                      return;
                  }
              
                  let promises = [];
              
                  for (;;) {
                      if (this.pending == this.limit || !this.jobs.length) {
                          Promise.all(promises).then(this._run);
                          break;
                      }
              
                      promises.push(this.jobs.shift()());
                      this.pending++;
                  }
              };
              
              LimitQueue.prototype.onDone = function(cb) {
                  this.callback = cb;
                  this._run();
              };
              
              // ========
              
              let q = new LimitQueue(3, 1000);
              
              q.push(
                  [{ name: 'Max1' }, { name: 'Max2' }, { name: 'Max3' }, { name: 'Max4' }, { name: 'Max5' }].map(
                      data => () => {
                          console.log(1, data.name);
              
                          return new Promise((resolve, reject) => {
                              console.log(2, data.name);
              
                              setTimeout(() => {
                                  console.log(3, data.name);
                                  resolve(data.name);
                              }, Math.ceil(Math.random() * 3000));
                          });
                      }
                  )
              );
              
              q.onDone(results => {
                  console.log(4, results);
              });
              
              </script>

              В консоли следующее:


              1 "Max1"
              2 "Max1"
              1 "Max2"
              2 "Max2"
              1 "Max3"
              2 "Max3"
              [задержка]
              3 "Max3"
              [задержка]
              3 "Max1"
              [задержка]
              3 "Max2"
              [задержка 1000ms]
              1 "Max4"
              2 "Max4"
              1 "Max5"
              2 "Max5"
              [задержка]
              3 "Max5"
              [задержка]
              3 "Max4"
              4 (5) ["Max1", "Max2", "Max3", "Max4", "Max5"]

              Завернуть всё это в красивый интерфейс уже не проблема и как видите нет никаких потоков.


              UPD: накосячил конечно немного, но суть я думаю понятна.

                0
                вот тут есть код условного сервер. Почему если я меняю код вашего емулятора промисов на реальный реквест, то вижу только 5 запросов начинает происходить что-то странное?
                  0
                  Потому что их там всего 5 и они разделены на пачки по 3. Сперва отправляются 3 запроса, после ответа на все (там случайная задержка подставляется) отправляются ещё 2, после ответа на оба выводятся все результаты. В общем, в выводе всё видно.
                    0
                    спасибо. Меня сбило с толку что почему-то зацикливается вывод по окончании
                    4 [ 201, 201, 201, 201, 201 ]
                    4 [ 201, 201, 201, 201, 201 ]
                    4 [ 201, 201, 201, 201, 201 ]
                    4 [ 201, 201, 201, 201, 201 ]
                    4 [ 201, 201, 201, 201, 201 ]
                      0

                      да)), про это я и написал:


                      UPD: накосячил конечно немного, но суть я думаю понятна.

                      Поотлаживать конечно не помешает, считайте, что это псевдокод демонстрирующий как это делать без потоков.

                        0
                        что будет с вашим кодом, если произойдет следующее:
                        1 вызов займет 1с
                        2 вызов займет 5с
                        3 вызов займет 1с
                        через сколько времени начнется:
                        4-й вызов?
                        5-й вызов?
                          0
                          Через 5, там Promise.all внутри, почитайте код, там же мало совсем, 3 минуты разобраться ;)
                            0
                            я спросил как раз потому что почитал. а как по вашему – через сколько должен? если у нас рейтлимит на 1 секунду.
                              0
                              Ошибся в комментарии, через 6 конечно, в коде этот момент правильно сделан.
                                0
                                Вы меня не правильно поняли, для того чтобы вызывать Апи с максимально возможной скоростью – через сколько должен происходить 4-й вызов при условии что первые два прошли за секунду?
                                  0
                                  В общем, самый долгий запрос из пачки плюс задержка. Если самый долгий 1 сек. и задержка 1 сек., то 2 секунды.
                                    0
                                    ок, смотрите – мы ушли в некоторые интересные дебри. С моей точки зрения – эта стратегия далеко не оптимальна. вам интересно продлжать этот разговор? я спрашиваю чтобы у вас не сложилось впечатление что я к вам «прикапываюсь»
                                      0
                                      Вы считаете, что складывать задержку и самый долгий запрос не нужно, а просто использовать самое долгое из них? Да, так оптимальнее. Доработать пример до этого минутное дело.
                                        0
                                        Я считаю что в каждое окно времени у вас должно быть пять «активных» запросов, тоесть не нужно дожидаться окончания самого длинного запроса, если началось новое «окно»
                                        вот сервер с реальным тротлингом, там же код batchCreate и логи для сервера и create

                                        Вот как выглядит кусочек лога(это нет тоже лог что в гисте!)
                                        app finished 3 +53ms
                                        app finished 2 +572ms
                                        app finished 1 +11ms
                                        app finished 0 +1s
                                        app start 5 +1ms
                                        app start 6 +1ms
                                        app start 7 +0ms
                                        app start 8 +0ms
                                        app finished 4 +76ms
                                        app start 9 +0ms < — свободное окно и пошел вызов, не дожидаясь окончания всех долгоиграющих запросов
                                        app finished 6 +188ms
                                        app finished 8 +1s
                                        app finished 5 +74ms
                                        app finished 9 +244ms
                                        app finished 7 +121ms

                                        понимайте о чем я? сможете так доизменить свой код? прошу заметить в коде batchCreate измененно две строчки(имею ввиду по сути, а не добавлен логгинг)
                                          0
                                          понимайте о чем я?

                                          понимаю, так ещё оптимальнее, но не вижу проблемы написать и это без потоков, навскидку, мне кажется, даже кода меньше получится (чем сейчас в примере выше). Только давайте по очереди? Мне просто не особо интересно такие задачки решать. Попробуйте сами, а я уже попробую сделать ещё проще.

                                            0
                                            я уже сделал. пример по ссылке именно так работает – просто в следствии рандомного времени задержки, это не всегда видно. теперь интересно посмотреть на ваш код
                                              0
                                              Вы использовали готовый метод сделанный именно под вашу задачу, на это не нужно много времени и умения, я же предлагаю вам решить эту интереснейшею задачу на чистом js. Мне кажется будет честно, если мы будем тратить одинаковое время впустую, а иначе вы начнёте перечислять другие возможности highland или конкретно этого метода, а мне сидеть и переписывать всё это на чистом js? Нет уж), либо признайте, что потоки в данном примере не обязательны, либо давайте по очереди.
                                                0
                                                извините, но по-моему, это ваше утвержение
                                                но не вижу проблемы написать и это без потоков, навскидку, мне кажется, даже кода меньше получится

                                                Я как раз тут проблему вижу. По-этому, если вам нечего больше сказать – давайте не будем занимать время друг-друга. Спасибо
                                                  0

                                                  всё правильно, это мои слова, для меня это действительно не сложная задача, но я же не говорил, что хочу потратить ещё пол часа своего времени.


                                                  Я как раз тут проблему вижу

                                                  в чём проблема, в задаче? Вы действительно считаете её нерешаемой без потоков, серьёзно? Что там в потоках по вашему, божественный эфир? Или проблема в вас? Тогда это тем более в ваших интересах, потренируетесь. А то на собеседованиях уже страшно просить что-то простейшее написать.

                                                    0
                                                    Извините, вы похоже вместо кода начали генерить холивар. Доброй ночи, не растраивайтесь, конечно можете
                                                      0
                                                      Код я генерирую обычно за ЗП, здесь я вам ничего не должен, а холивар здесь только от вашего нежелания признать очевидное.
                  0
                  и это на самом деле очень круто, что вы это написали. Вы согласны с тем что эта задача не настолько тривиальна, как кажется на первый взгляд?
                  Завернуть всё это в красивый интерфейс – давайте исходить из того что в красивый интерефейс можно что угодно завернуть
                    0
                    Вы согласны с тем что эта задача не настолько тривиальна, как кажется на первый взгляд?

                    честно, нет, для меня сложность сразу ясна.


                    давайте исходить из того что в красивый интерефейс можно что угодно завернуть

                    вы же не хотите сказать, что внутри highland что-то более простое чем я предложил?

                      0
                      нет, конечно. Но Highland(Rx, lodash, orm, whatever) – позволяет решать много задач. вы предлагаете написать штуку которая решает одну задачу и завернуть ее в красивый интерфейс.
                        0
                        вы предлагаете написать штуку которая решает одну задачу

                        почему нет?


                        Хотя вопрос риторический, мне не нужно объяснять зачем объединять что-то в библиотеки, речь не про это, я говорю, что приведённый вами пример в статье никак не показывает плюсов реактивного программирования, а его ключевая часть легко переносится в другие библиотеки никак не связанные с РП. Если бы мне так объясняли РП я бы сказал "круто", но никогда бы не стал его использовать для себя.

                          0
                          буду рад если вы укажите ссылки на лучшие примеры. Большой курсеровский курс прошу не предлагать – в силу того что в он напорядок больше размера данной статьи
                            0
                            если честно, я не совсем понимаю что тут еще показывать
                            в вашем коде навскидку 70 cтрок
                            в моем 10
                            это если еще опустить факт того что там есть пассажи вида:
                            for (;;) {
                            if (this.pending == this.limit || !this.jobs.length) {
                            которые нужно вдумчиво читать

                            С Аргументом «вы же не хотите сказать, что внутри highland что-то более простое чем я предложил?» я не знаю как спорить – ним можно отбросить любую парадигму программирования и заменить ее на на набор функций
                              0
                              в вашем коде навскидку 70 cтрок
                              в моем 10

                              причём тут количество строк? Ок, представьте другую библиотеку внутри которой ratelimit на основе моего кода. Представьте в lodash завтра появляется ratelimit реализованный без потоков. Как теперь докажете необходимость РП?


                              буду рад если вы укажите ссылки на лучшие примеры

                              я делал несколько докладов про РП и как-то начинал писать статью на эту тему. И каждый раз основной проблемой был именно хороший пример, глядя на который сразу становилось бы ясно, что это и зачем. Фишка РП в том, что плюсы становятся особенно видны именно в сложных ситуациях, а такое в статью или доклад не запихаешь. Мне нечего вам предложить в виде одного примера, но я похоже в скором времени ещё раз попробую написать такую статью. Может со второй попытки что-то получится.

                                0
                                >Как теперь докажете необходимость РП?
                                исходя из вашей логики, можно отменить «все» аргументируя тем, что это можно на машином коде написать.

                                Эта ветка ушла в холивар – это не интересно. предлагаю закончить. продолжить там где код
                                  0
                                  можно отменить «все»

                                  я как раз не предлагаю отменять никаких абстракций, я предлагаю представить, что такой метод есть ещё в какой-то библиотеке, но реализован без потоков. Выше мы как раз обсуждаем насколько возможно сделать его без потоков.

                0
                Вопрос «в лоб»: а как при таком подходе контролировать ratelimit в условиях, когда сервис api одновременно запущен в 2..n экземплярах? Отказ от масштабируемости?)
                  0
                  throtlling в простейшем случае выполняется по апи — вам это в принципе не нужно.
                  В более сложном – вам не обойтись без какой-то мастер ноды которая будет раздавать джобы слейвам, но это выходит за рамки этой статьи.
                    0
                    в комментарии выше: вместо апи — айпи.
                  +3

                  Мне кажется в статье было куда больше смысла, если бы:


                  .ratelimit(5, 1000)

                  был написан "руками" с пояснениями. Тогда это была бы демонстрация "на пальцах", как можно удобно работать со stream-ми асинхронных данных. Вместо этого мы видим статью, которая сводится к "в highland для этого есть готовый удобный метод". Зачем? :)


                  И мне кажется в статье под названием "Самое краткое введение в Reactive Programming" должно быть простейшее описание паттерна Observer. А не реактивные библиотечные потоки с тыщей мутных методов :)

                    –1
                    >простейшее описание паттерна Observer
                    мне так не кажется. Проще думать об этом как именно о потоках – отсюда «качаю», сюда «заливаю». Потом человек уже сам почитает про все остальное. Оставить статью краткой и рассказать о всем – заведомо не достижимая цель.
                      +1

                      Ну вот смотрите. Приходит человек читать про реактивное программирование. Он не знает что такое stream-ы и протокола их работы. Он не знает где эти странные штуки могут быть удобными. Он не знает паттерна наблюдатель. Он весь во внимании. Ведь это краткое введение, ему сейчас всё объяснят. Что в итоге он видит в этой статье? Хм… Ну что есть какие-то сторонние библиотеки, которые позволяют записать привычные вещи в странном виде. Он хочет пояснений — а зачем так? Что тут под капотом происходит? А вы ему пример с rateLimit. Он думает — вот, интересный пример, сейчас мне всё расскажут и покажут. Вы говорите ему, мол есть готовый метод в 1 строку. Почувствуй силу rxjshighland, о юный падаван! Он в ответ — эээээ, что? На этом статья заканчивается. Что нового для себя подчерпнул человек? В чём был "message"? :) Смотрите, как я могу?


                      Даже зная как работают nodejs-streams, observer-ы и пр. штуки, может быть совершенно не очевидно, в каких реальных жизненных обстоятельствах, эти непривычные подходы могут оказаться к месту. Тут ведь как с каррированием. Раз в неделю кто-нибудь пишет статью про то, что это такое. Однако людей, которые нашли ему применение в своих реальных задачах можно пересчитать по пальцам одной руки (ладно, тут я несколько утрирую).

                  Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                  Самое читаемое