
В последние годы реактивное программирование в целом, а технология ReactiveX в частности, обретает всё большую популярность среди разработчиков. Одни уже активно используют все преимущества этого подхода, а другие только “что-то слышали”. Со своей стороны я постараюсь помочь вам представить, насколько некоторые концепции реактивного программирования способны изменить взгляд на привычные, казалось бы, вещи.
Существует два принципиально различных способа организации больших систем: в соответствии с объектами и состояниями, которые живут в системе, и в соответствии с потоками данных, которые проходят через неё. Парадигма реактивного программирования предполагает легкость выражения потоков данных, а также распространение изменений благодаря этим потокам. Например, в императивном программировании операция присваивания означает конечность результата, тогда как в реактивном значение будет пересчитано при получении новых входных данных. Поток значений проходит в системе ряд трансформаций, которые необходимы для решения определенной задачи. Оперирование потоками позволяет системе быть расширяемой и асинхронной, а правильная реакция на возникающие ошибки – отказоустойчивой.
ReactiveX – библиотека, позволяющая создавать асинхронные и событийно-ориентированные программы, использующие наблюдаемые последовательности. Она расширяет шаблон Наблюдателя для поддержки последовательностей данных, добавляет операторы для их декларативного соединения, избавляя от необходимости заботиться о синхронизации и безопасности потоков, разделяемых структурах данных и неблокирующего I/O.
Одним из основных отличий библиотеки ReactiveX от функционального реактивного программирования является то, что она оперирует не непрерывно изменяющимися, а дискретными значениями, которые “испускаются” в течении длительного времени.
Стоит немного рассказать о том, что такое Observer, Observable, Subject. Модель Observable является источником данных и позволяет обрабатывать потоки асинхронных событий похожим образом с тем, который вы используете для коллекций данных, таких как массивы. И всё это вместо колбэков, а значит, код является более читабельным и менее склонным к ошибкам.
В ReactiveX наблюдатель (Observer) подписывается на Observable и впоследствии реагирует на элемент или последовательность элементов, которые тот отправляет. У каждого Observer, подписанного на Observable, вызывается метод Observer.on_next() на каждый элемент потока данных, после которого может быть вызван как Observer.on_complete(), так и Observer.on_error(). Часто Observable применяется таким образом, что он не начинает отдавать данные до тех пор, пока кто-нибудь не подписывается на него. Это так называемые “ленивые вычисления” – значения вычисляются только тогда, когда в них возникает потребность.

Бывают задачи, для решения которых нужно соединить Observer и Observable, чтобы принимать сообщения о событиях и сообщать о них своим подписчикам. Для этого существует Subject, имеющий, кроме стандартной, ещё несколько реализаций:
- ReplaySubject имеет возможность кэшировать все поступившие в него данные, а при появлении нового подписчика – отдавать всю эту последовательность сначала, работая далее в обычном режиме.
- BehaviorSubject хранит последнее значение, по аналогии с ReplaySubject отдавая его появившемуся подписчику. При создании он получает значение по умолчанию, которое будет получать каждый новый подписчик, если последнего значения еще не было.
- AsyncSubject также хранит последнее значение, но не отдает данные, пока не завершится вся последовательность.
Observable и Observer – только начало ReactiveX. Они не несут в себе всю мощь, которую являют собой операторы, позволяющие трансформировать, объединять, манипулировать последовательностями элементов, которые отдают Observable.
В документации ReactiveX описание операторов включает в себя использование Marble Diagram. К примеру, вот как эти диаграммы представляют Observable и их трансформации:

Глядя на диаграмму ниже, легко понять, что оператор map трансформирует элементы, отдаваемые Observable, путем применения функции к каждому из них.

Хорошей иллюстрацией возможностей ReactiveX является приложение RSS-агрегатора. Здесь возникает необходимость асинхронной загрузки данных, фильтрации и трансформации значений, поддержания актуального состояния путем периодического обновления.
В этой статье примеры для представления основных принципов ReactiveX написаны с использованием библиотеки rx для языка программирования Python. Вот так, например, выглядит абстрактная реализация наблюдателя:
class Observer(metaclass=ABCMeta): @abstractmethod def on_next(self, value): return NotImplemented @abstractmethod def on_error(self, error): return NotImplemented @abstractmethod def on_completed(self): return NotImplemented
Наше приложение в режиме реального времени будет обмениваться сообщениями с браузером посредством веб-сокетов. Возможность легко реализовать это предоставляет Tornado.
Работа программы начинается с запуска сервера. При обращении браузера к серверу открывается веб-сокет.
Код
import json import os import feedparser from rx import config, Observable from rx.subjects import Subject from tornado.escape import json_decode from tornado.httpclient import AsyncHTTPClient from tornado.platform.asyncio import AsyncIOMainLoop from tornado.web import Application, RequestHandler, StaticFileHandler, url from tornado.websocket import WebSocketHandler asyncio = config['asyncio'] class WSHandler(WebSocketHandler): urls = ['https://lenta.ru/rss/top7', 'http://wsrss.bbc.co.uk/russian/index.xml'] def open(self): print("WebSocket opened") # здесь будет основная логика нашего приложения def on_message(self, message): obj = json_decode(message) # Отправляет сообщение, которое получает user_input self.subject.on_next(obj['term']) def on_close(self): # Отписаться от Observable; по цепочке остановит работу всех observable self.combine_latest_sbs.dispose() print("WebSocket closed") class MainHandler(RequestHandler): def get(self): self.render("index.html") def main(): AsyncIOMainLoop().install() port = os.environ.get("PORT", 8080) app = Application([ url(r"/", MainHandler), (r'/ws', WSHandler), (r'/static/(.*)', StaticFileHandler, {'path': "."}) ]) print("Starting server at port: %s" % port) app.listen(port) asyncio.get_event_loop().run_forever()
Для обработки введенного пользователем запроса создается Subject, при подписке на который он отправляет значение по умолчанию (в нашем случае — пустую строку), а затем раз в секунду отправляет то, что введено пользователем и удовлетворяет условиям: длина 0 или больше 2, значение изменилось.
# Subject одновременно и Observable, и Observer self.subject = Subject() user_input = self.subject.throttle_last( 1000 # На заданном временном промежутке получать последнее значение ).start_with( '' # Сразу же после подписки отправляет значение по умолчанию ).filter( lambda text: len(text) == 0 or len(text) > 2 ).distinct_until_changed() # Только если значение изменилось
Также для периодического обновления новостей предусмотрен Observable, который раз в 60с отдает значение.
interval_obs = Observable.interval( 60000 # Отдает значение раз в 60с (для периодического обновления) ).start_with(0)
Два этих потока соединяются оператором combine_latest, в цепочку встраивается Observable для получения списка новостей. После чего на этот Observable создается подписка, вся цепочка начинает работать только в этот момент.
# combine_latest собирает 2 потока из запросов пользователя и временных # интервалов, срабатывает на любое сообщение из каждого потока self.combine_latest_sbs = user_input.combine_latest( interval_obs, lambda input_val, i: input_val ).do_action( # Срабатывает на каждый выпущенный элемент # Отправляет сообщение для очистки списка на фронтенд lambda x: send_response('clear') ).flat_map( # В цепочку встраивается Observable для получения списка self.get_data ).subscribe(send_response, on_error) # Создается подписка; вся цепочка начинает работать только в этот момент
Следует подробнее остановиться на том, что такое “Observable для получения списка новостей”. Из списка url для получения новостей мы создаем поток данных, элементы которого приходят в функцию, где при помощи HTTP-клиента Tornado AsyncHTTPClient происходит асинхронная загрузка данных для каждого элемента списка urls. Из них также создается поток данных, который фильтруется по запросу, введенному пользователем. Из каждого потока мы берем по 5 новостей, которые приводим к нужному формату для отправки на фронтенд.
Код
def get_rss(self, rss_url): http_client = AsyncHTTPClient() return http_client.fetch(rss_url, method='GET') def get_data(self, query): # Observable создается из списка url return Observable.from_list( self.urls ).flat_map( # Для каждого url создается Observable, который загружает данные lambda url: Observable.from_future(self.get_rss(url)) ).flat_map( # Полученные данные парсятся, из них создается Observable lambda x: Observable.from_list( feedparser.parse(x.body)['entries'] ).filter( # Фильтрует по вхождению запроса в заголовок или текст новости lambda val, i: query in val.title or query in val.summary ).take(5) # Берем только по 5 новостей по каждому url ).map(lambda x: {'title': x.title, 'link': x.link, 'published': x.published, 'summary': x.summary}) # Преобразует данные для отправки на фронтенд
После того, как поток выходных данных сформирован, его подписчик начинает поэлементно получать данные. Функция send_response отправляет полученные значения во фронтенд, который добавляет новость в список.
def send_response(x): self.write_message(json.dumps(x)) def on_error(ex): print(ex)
В файле feeder.js
Код
ws.onmessage = function(msg) { var value = JSON.parse(msg.data); if (value === "clear") {$results.empty(); return;} // Append the results $('<li><a tabindex="-1" href="' + value.link + '">' + value.title +'</a> <p>' + value.published + '</p><p>' + value.summary + '</p></li>' ).appendTo($results); $results.show(); }
Таким образом, реализуется push-технология, в которой данные поступают от сервера к фронтенду, который лишь отправляет введенный пользователем запрос для поиска по новостям.
В качестве заключения предлагаю задуматься о том, какая реализация получилась бы при привычном подходе с использованием колбэков вместо Observable, без возможности легко объединить потоки данных, без возможности мгновенной отправки данных потребителю-фронтенду и с необходимостью отслеживать изменения в строке запроса. Среди Python-разработчиков технология пока что практически не распространена, однако я вижу уже несколько возможностей её применить на текущих проектах.
Пример использования ReactiveX для Python вы можете найти в github репозитории с демо-проектом RSS-агрегатора.