Цель данной статьи – показать на примере зачем нужно reactive programming, как оно связано с функциональным программированием, и как с его помощью можно писать декларативный код, который легко адаптировать к новым требованиям. Кроме того, хочется сделать это максимально кратко и просто на примере приближенном к реальному.
Возьмем такую задачу:
Есть некий сервис c REST API и endpointом /people
. При POST-запросе на этот endpoint'a создается новая сущность. Написать функцию которая принимает массив объектов вида { name: 'Max' }
и создают набор сущностей посредством API(по-английски, это называется batch-операция).
Давайте решим эту задачу в императивном стиле:
const request = require('superagent')
function batchCreate(bodies) {
const calls = []
for (let body of bodies) {
calls.push(
request
.post('/people')
.send(body)
.then(r => r.status)
)
}
return Promise.all(calls)
}
Давайте, для сравнения, перепишем этот кусочек кода в функциональном стиле. Для простоты, под функциональным стилем мы будем понимать:
- Применение функциональных примитивов(.map, .filter, .reduce) вместо императивных циклов(for, while)
- Код организован в "чистые" функции – они зависят только от своих аргументов и не зависят от состояния системы
Код в функциональном стиле:
const request = require('superagent')
function batchCreate(bodies) {
const calls = bodies.map(body =>
request
.post('/people')
.send(body)
.then(r => r.status)
)
return Promise.all(calls)
}
Мы получили кусок кода такого же размера и стоит признаться что не понятно чем этот кусок лучше предыдущего.
Для того чтобы понять чем второй кусок кода лучше – нужно начать менять код, представим что к оригинальной задаче появилось новое требование:
У сервиса который мы вызываем появилось ограничение на количество запросов в промежуток времени: за секунду один клиент может выполнить не более пяти запросов. Выполнение большего количества запросов приведет к тому что сервис будет возвращать 429 HTTP ошибку(too many requests).
В этом месте, пожалуй, стоит остановиться и попробовать решить задачу самому, %username%
Возьмем за основу наш функциональный код и попробуем его изменить. Основная проблема "чистого" функционального программирования состоит в том, что оно ничего "не знает" — о среде выполнения и вводе-выводе(в английском для этого есть выражение side effects), но на практике мы постоянно с ними работаем.
Чтобы заполнить этот пробел на помощь приходит Reactive Programming — набор подходов пытающихся решить проблему side effects. Самой известной реализацией этой парадигмы является библиотека Rx, использующая концепцию reactive streams
Что такое reactive streams? Если очень кратко, то это подход позволяющий применить функциональные примитивы(.map, .filter, .reduce) к чему-то распределенному по времени.
Например, мы передаем по сети некий набор комманд – нам не нужно дожидаться пока мы получим весь набор, мы представляем его как reactive stream и можем с ним работать. Тут возникают еще два важных концепта:
- поток может быть бесконечным или как угодно долго распределенным по времени
- передающая сторона передает команду только в том случае, если принимающая готова ее обработать(backpressure)
Целью этой статьи является поиск легких путей, поэтому, мы возьмем библиотеку Highland, которая старается решить ту же задачу что и Rx, но намного проще в освоении. Идея лежащая внутри проста: давайте возьмем за основу Node.js streams и будем “переливать” данные из одного Stream в другой.
Приступим: начнем с простого — сделаем наш код "реактивным" без добавления нового функционала
const request = require('superagent')
const H = require(‘highland’)
function batchCreate(bodies) {
return H(bodies)
.flatMap(body =>
H(request
.post('localhost:3000/people')
.send(body)
.then(r => r.status)
)
)
.collect()
.toPromise(Promise)
}
На что стоит обратить внимание:
- H(bodies) – мы создаем stream из массива
- .flatMap и callback который он принимает. Идея довольно проста — мы заворачиваем Promise в конструктор потока чтобы получить поток с одним значением(или ошибкой. важно понимать что это именно значение, а не Promise).
В результате это нам дает поток потоков — при помощи flatMap мы сглаживаем это в один поток значений которым мы можем оперировать(кто сказал монада?) - .collect – нам нужен для того чтобы собрать все значения в одной "точке" в массив
- .toPromise – вернет нам Promise, который будет fulfilled в момент когда у нас будет значение из потока
Теперь давайте попробуем реализовать наше требование:
const request = require('superagent')
const H = require('highland')
function batchCreate(bodies) {
return H(bodies)
.flatMap(body =>
H(request
.post('localhost:3000/people')
.send(body)
.then(r => r.status)
)
)
.ratelimit(5, 1000)
.collect()
.toPromise(Promise)
}
Благодаря концепту backpressure – это всего лишь одна строчка .ratelimit в данной парадигме. В Rx это занимает приблизительно столько же места.
Ну вот и все, интересно ваше мнение:
- получилось ли у меня достичь декларируемого в начале статьи результата?
- можно ли достичь аналогичного результата используя императивный подход?
- заинтересовались ли вы Reactive programming?
P.S.: вот тут можно найти еще одну мою статью про Reactive Programming