Как правило, когда нужно что-то сделать быстро и дёшево, мы не задумываемся над отказоустойчивостью и масштабируемостью нашего приложения, что через некоторое время обязательно приводит к боли. Современные решения позволяют быстро и просто решить эту проблему.
На примере перехода от монолитного приложения к микросервисам, я попробую показать все плюсы и минусы каждого подхода. Статья разделена на три части:
- В первой части будет рассмотрено монолитное приложение на веб-фреймворке Dash, т.е. генерация данных и их отображение будут находиться в одном месте.
- Вторая часть посвящена разложению монолитного приложения на микросервисы, т.е. генерацией данных будет заниматься один сервис, отображением другой, а связь между ними будет налажена через брокер сообщений Kafka.
- В третьей части микросервисы будут "упакованы" в Docker контейнеры.
Конечное приложение будет выглядеть, как показано на диаграмме снизу.

Введение
Для того чтобы лучше понять пример, желательно иметь хотя бы базовые знания в Kafka и Docker, приведу несколько на мой взгляд дельных курсов и статей:
- По Kafka, очень подробно разобрано на youtube канале Stephane Maarek, канал на английском языке.
- Про Docker мне понравился плейлист с youtube канала letsCode на русском языке.
- Статья на хабре Просто о микросервисах от YuryKa.
Полный код проекта залит на github, можно скачать отсюда.
Часть 1. Монолитное приложение
Для примера монолитного приложения я взял код из официальной документации по Dash (Plotly), посмотреть его можно здесь, в исходном коде проекта находится в папке local_app. Это идеальное решение для быстрого прототипирования.
import datetime import dash import dash_core_components as dcc import dash_html_components as html import plotly from dash.dependencies import Input, Output # pip install pyorbital from pyorbital.orbital import Orbital satellite = Orbital('TERRA') external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css'] app = dash.Dash(__name__, external_stylesheets=external_stylesheets) app.layout = html.Div( html.Div([ html.H4('TERRA Satellite Live Feed'), html.Div(id='live-update-text'), dcc.Graph(id='live-update-graph'), dcc.Interval( id='interval-component', interval=1*1000, # in milliseconds n_intervals=0 ) ]) ) @app.callback(Output('live-update-text', 'children'), [Input('interval-component', 'n_intervals')]) def update_metrics(n): lon, lat, alt = satellite.get_lonlatalt(datetime.datetime.now()) style = {'padding': '5px', 'fontSize': '16px'} return [ html.Span('Longitude: {0:.2f}'.format(lon), style=style), html.Span('Latitude: {0:.2f}'.format(lat), style=style), html.Span('Altitude: {0:0.2f}'.format(alt), style=style) ] # Multiple components can update everytime interval gets fired. @app.callback(Output('live-update-graph', 'figure'), [Input('interval-component', 'n_intervals')]) def update_graph_live(n): satellite = Orbital('TERRA') data = { 'time': [], 'Latitude': [], 'Longitude': [], 'Altitude': [] } # Collect some data for i in range(180): time = datetime.datetime.now() - datetime.timedelta(seconds=i*20) lon, lat, alt = satellite.get_lonlatalt( time ) data['Longitude'].append(lon) data['Latitude'].append(lat) data['Altitude'].append(alt) data['time'].append(time) # Create the graph with subplots fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2) fig['layout']['margin'] = { 'l': 30, 'r': 10, 'b': 30, 't': 10 } fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'} fig.append_trace({ 'x': data['time'], 'y': data['Altitude'], 'name': 'Altitude', 'mode': 'lines+markers', 'type': 'scatter' }, 1, 1) fig.append_trace({ 'x': data['Longitude'], 'y': data['Latitude'], 'text': data['time'], 'name': 'Longitude vs Latitude', 'mode': 'lines+markers', 'type': 'scatter' }, 2, 1) return fig if __name__ == '__main__': app.run_server(debug=True)
Приложение каждую секунду обновляет графики и таким образом эмулируется реал-тайм поступление данных. В качестве источника данных используется python пакет pyorbital, с помощью которого можно производить различные астрономические вычисления (в этом примере рассчитывается положение научно-исследовательского спутника Terra (EOS AM-1)). Полученные данные и графики отображаются в браузере через Dash (Plotly) на локальном хосте: 127.0.0.1:8050.
Данные, которые получаются на выходе — это altitude, longitude и latitude (высота, долгота и широта), т.е. координаты спутника в текущий момент времени, первый график показывает изменение высоты, а второй — долготы и широты соответственно.

(На рисунке показана работа монолитного (оригинального) приложения)
К плюсам можно отнести:
- Высокая скорость работы, данные вычисляются и сразу же обновляются.
К минусам:
- Высокая стоимость ошибки, например, если ошибка возникнет при вычислении данных, то и их отображение тоже пострадает, т.к. это повлечет падение всего приложения (хотя конечно можно написать обработчик ошибок, но это усложнит код).
- Невозможность обновить/исправить приложение на лету. Если, например, необходимо изменить алгоритм вычисления данных, необходимо будет перезапустить всё, в том числе и отображение.
Часть 2. Приложение на основе микросервисов
В исходном коде проекта находится в папке local_microservices_app, для удобства туда же я положил сервис Kafka упакованный в Docker, исходный код которого можно посмотреть здесь (взято с github репозитория Stephane Maarek)
Монолитное приложение я разделил на две части, первая часть — backend (producer.py), генерирует данные и отправляет их в Kafka, вторая часть — frontend (consumer.py, graph_display.py) читает сообщения из Kafka и отображает графики в браузере.
backend:
Producer (производитель данных) вычисляет данные и отправляет их в Kafka раз в одну секунду (в оригинальном приложении данные рассчитывались за каждые 20 секунд)
from time import sleep import datetime from confluent_kafka import Producer import json from pyorbital.orbital import Orbital satellite = Orbital('TERRA') topic = 'test_topic' producer = Producer({'bootstrap.servers': 'localhost:9092'}) def acked(err, msg): if err is not None: print("Failed to deliver message: {}".format(err)) else: print("Produced record to topic {} partition [{}] @ offset {}" .format(msg.topic(), msg.partition(), msg.offset())) # send data every one second while True: time = datetime.datetime.now() lon, lat, alt = satellite.get_lonlatalt(time) record_value = json.dumps({'lon':lon, 'lat': lat, 'alt': alt, 'time': str(time)}) producer.produce(topic, key=None, value=record_value, on_delivery=acked) producer.poll() sleep(1)
frontend:
Consumer (потребитель данных) написан в виде класса MyKafkaConnect и находится в файле consumer.py, при инициализации загружает из Kafka последние 180 (или меньше, если их не достаточно) сообщений. При последующих обращениях докачивает все новые сообщения из Kafka.
Изначальное монолитное приложение (monolith.py) не сильно изменилось, ключевое изменение состоит в том, что данные рассчитываются не на месте, а загружаются через класс MyKafkaConnect, все прежние методы работают фактически также.
import datetime from confluent_kafka import Consumer, TopicPartition import json from collections import deque from time import sleep class MyKafkaConnect: def __init__(self, topic, group, que_len=180): self.topic = topic self.conf = { 'bootstrap.servers': 'localhost:9092', 'group.id': group, 'enable.auto.commit': True, } # the application needs a maximum of 180 data units self.data = { 'time': deque(maxlen=que_len), 'Latitude': deque(maxlen=que_len), 'Longitude': deque(maxlen=que_len), 'Altitude': deque(maxlen=que_len) } consumer = Consumer(self.conf) consumer.subscribe([self.topic]) # download first 180 messges self.partition = TopicPartition(topic=self.topic, partition=0) low_offset, high_offset = consumer.get_watermark_offsets(self.partition) # move offset back on 180 messages if high_offset > que_len: self.partition.offset = high_offset - que_len else: self.partition.offset = low_offset # set the moved offset to consumer consumer.assign([self.partition]) self.__update_que(consumer) # https://docs.confluent.io/current/clients/python.html#delivery-guarantees def __update_que(self, consumer): try: while True: msg = consumer.poll(timeout=0.1) if msg is None: break elif msg.error(): print('error: {}'.format(msg.error())) break else: record_value = msg.value() json_data = json.loads(record_value.decode('utf-8')) self.data['Longitude'].append(json_data['lon']) self.data['Latitude'].append(json_data['lat']) self.data['Altitude'].append(json_data['alt']) self.data['time'].append(datetime.datetime.strptime(json_data['time'], '%Y-%m-%d %H:%M:%S.%f')) # save local offset self.partition.offset += 1 finally: # Close down consumer to commit final offsets. # It may take some time, that why I save offset locally consumer.close() def get_graph_data(self): consumer = Consumer(self.conf) consumer.subscribe([self.topic]) # update low and high offsets (don't work without it) consumer.get_watermark_offsets(self.partition) # set local offset consumer.assign([self.partition]) self.__update_que(consumer) # convert data to compatible format o = {key: list(value) for key, value in self.data.items()} return o def get_last(self): lon = self.data['Longitude'][-1] lat = self.data['Latitude'][-1] alt = self.data['Altitude'][-1] return lon, lat, alt # for test if __name__ == '__main__': connect = MyKafkaConnect(topic='test_topic', group='test_group') while True: test = connect.get_graph_data() print('number of messages:', len(test['time']), 'unique:', len(set(test['time'])), 'time:', test['time'][-1].second) sleep(0.1)
import datetime import dash import dash_core_components as dcc import dash_html_components as html import plotly from dash.dependencies import Input, Output from consumer import MyKafkaConnect connect = MyKafkaConnect(topic='test_topic', group='test_group') external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css'] app = dash.Dash(__name__, external_stylesheets=external_stylesheets) app.layout = html.Div( html.Div([ html.H4('TERRA Satellite Live Feed'), html.Div(id='live-update-text'), dcc.Graph(id='live-update-graph'), dcc.Interval( id='interval-component', interval=1*1000, # in milliseconds n_intervals=0 ) ]) ) @app.callback(Output('live-update-text', 'children'), [Input('interval-component', 'n_intervals')]) def update_metrics(n): lon, lat, alt = connect.get_last() print('update metrics') style = {'padding': '5px', 'fontSize': '16px'} return [ html.Span('Longitude: {0:.2f}'.format(lon), style=style), html.Span('Latitude: {0:.2f}'.format(lat), style=style), html.Span('Altitude: {0:0.2f}'.format(alt), style=style) ] # Multiple components can update everytime interval gets fired. @app.callback(Output('live-update-graph', 'figure'), [Input('interval-component', 'n_intervals')]) def update_graph_live(n): # Collect some data data = connect.get_graph_data() print('Update graph, data units:', len(data['time'])) # Create the graph with subplots fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2) fig['layout']['margin'] = { 'l': 30, 'r': 10, 'b': 30, 't': 10 } fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'} fig.append_trace({ 'x': data['time'], 'y': data['Altitude'], 'name': 'Altitude', 'mode': 'lines+markers', 'type': 'scatter' }, 1, 1) fig.append_trace({ 'x': data['Longitude'], 'y': data['Latitude'], 'text': data['time'], 'name': 'Longitude vs Latitude', 'mode': 'lines+markers', 'type': 'scatter' }, 2, 1) return fig if __name__ == '__main__': app.run_server(debug=True)
К плюсам можно отнести:
- Меньше негативных последствий от ошибок, если ошибка возникнет при вычислении данных в backend микросервисе, то в таком случае отображение графиков продолжится, хотя конечно обновляться они не будут (обратите внимание, графики могут быть лишь не большим модулем большого приложения, которое продолжит работу).
- Возможность обновления/исправления ошибок на лету, т.к. модули работают независимо, то кратковременная остановка backend микросервиса не вызовет падения отображения графиков, хотя и будет небольшой лаг.
- Микросервис может быть написан на любом языке программирования.
К минусам:
- Меньшая скорость работы по сравнению с монолитным приложением, между вычислением данных и их отображением появляется посредник в виде Kafka, на работу которого требуется хоть и не большое, но время.

(На рисунке сверху backend после обновления отправляет вдвое больше сообщений, что видно на графике)
Часть 3. Приложение на основе микросервисов упакованных в Docker
В исходном коде проекта находится в папке docker_microservices_app. От второй части отличается тем, что backend и frontend упакованы в Docker. Также я добавил ещё два микросервиса в backend (ещё два научно-исследовательских спутника Aura (EOS CH-1) и Aqua (EOS PM-1)).
FROM python:3.7 RUN python -m pip install confluent-kafka RUN python -m pip install pyorbital WORKDIR /app COPY producer.py ./ CMD ["python", "producer.py"]
FROM python:3.7 RUN python -m pip install confluent-kafka RUN python -m pip install dash plotly WORKDIR /app COPY consumer.py graph_display.py ./ CMD ["python", "graph_display.py"]
version: '2.1' # Stephane Maarek's kafka-docker # https://github.com/simplesteph/kafka-stack-docker-compose/blob/master/zk-single-kafka-single.yml services: zoo1: image: zookeeper:3.4.9 hostname: zoo1 ports: - "2181:2181" restart: unless-stopped environment: ZOO_MY_ID: 1 ZOO_PORT: 2181 ZOO_SERVERS: server.1=zoo1:2888:3888 volumes: - ./zk-single-kafka-single/zoo1/data:/data - ./zk-single-kafka-single/zoo1/datalog:/datalog kafka1: image: confluentinc/cp-kafka:5.5.0 hostname: kafka1 ports: - "9092:9092" restart: unless-stopped environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 volumes: - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data depends_on: - zoo1 backend_terra: build: context: ./backend restart: unless-stopped environment: BOOTSTRAP_SERVERS: "kafka1:19092" TOPIC: "terra_topic" SATELLITE: "TERRA" depends_on: - kafka1 backend_aqua: build: context: ./backend restart: unless-stopped environment: BOOTSTRAP_SERVERS: "kafka1:19092" TOPIC: "aqua_topic" SATELLITE: "AQUA" depends_on: - kafka1 backend_aura: build: context: ./backend restart: unless-stopped environment: BOOTSTRAP_SERVERS: "kafka1:19092" TOPIC: "aura_topic" SATELLITE: "AURA" depends_on: - kafka1 frontend: build: context: ./frontend ports: - "8050:8050" restart: unless-stopped environment: BOOTSTRAP_SERVERS: "kafka1:19092" depends_on: - backend_terra - backend_aqua - backend_aura
from time import sleep import datetime from confluent_kafka import Producer import json from pyorbital.orbital import Orbital import os topic = os.environ['TOPIC'] bootstrap_servers = os.environ['BOOTSTRAP_SERVERS'] s_name = os.environ['SATELLITE'] satellite = Orbital(s_name) producer = Producer({'bootstrap.servers': bootstrap_servers}) def acked(err, msg): if err is not None: print("Failed to deliver message: {}".format(err)) else: print("Produced record to topic {} partition [{}] @ offset {}" .format(msg.topic(), msg.partition(), msg.offset())) # send data every one second while True: time = datetime.datetime.now() lon, lat, alt = satellite.get_lonlatalt(time) record_value = json.dumps({'lon':lon, 'lat': lat, 'alt': alt, 'time': str(time)}) producer.produce(topic, key=None, value=record_value, on_delivery=acked) producer.poll() sleep(1)
import datetime from confluent_kafka import Consumer, TopicPartition import json from collections import deque from time import sleep class MyKafkaConnect: def __init__(self, topic, group, que_len=180): self.topic = topic self.conf = { 'bootstrap.servers': 'localhost:9092', 'group.id': group, 'enable.auto.commit': True, } # the application needs a maximum of 180 data units self.data = { 'time': deque(maxlen=que_len), 'Latitude': deque(maxlen=que_len), 'Longitude': deque(maxlen=que_len), 'Altitude': deque(maxlen=que_len) } consumer = Consumer(self.conf) consumer.subscribe([self.topic]) # download first 180 messges self.partition = TopicPartition(topic=self.topic, partition=0) low_offset, high_offset = consumer.get_watermark_offsets(self.partition) # move offset back on 180 messages if high_offset > que_len: self.partition.offset = high_offset - que_len else: self.partition.offset = low_offset # set the moved offset to consumer consumer.assign([self.partition]) self.__update_que(consumer) # https://docs.confluent.io/current/clients/python.html#delivery-guarantees def __update_que(self, consumer): try: while True: msg = consumer.poll(timeout=0.1) if msg is None: break elif msg.error(): print('error: {}'.format(msg.error())) break else: record_value = msg.value() json_data = json.loads(record_value.decode('utf-8')) self.data['Longitude'].append(json_data['lon']) self.data['Latitude'].append(json_data['lat']) self.data['Altitude'].append(json_data['alt']) self.data['time'].append(datetime.datetime.strptime(json_data['time'], '%Y-%m-%d %H:%M:%S.%f')) # save local offset self.partition.offset += 1 finally: # Close down consumer to commit final offsets. # It may take some time, that why I save offset locally consumer.close() def get_graph_data(self): consumer = Consumer(self.conf) consumer.subscribe([self.topic]) # update low and high offsets (don't work without it) consumer.get_watermark_offsets(self.partition) # set local offset consumer.assign([self.partition]) self.__update_que(consumer) # convert data to compatible format o = {key: list(value) for key, value in self.data.items()} return o def get_last(self): lon = self.data['Longitude'][-1] lat = self.data['Latitude'][-1] alt = self.data['Altitude'][-1] return lon, lat, alt # for test if __name__ == '__main__': connect = MyKafkaConnect(topic='test_topic', group='test_group') while True: test = connect.get_graph_data() print('number of messages:', len(test['time']), 'unique:', len(set(test['time'])), 'time:', test['time'][-1].second) sleep(0.1)
import datetime import dash import dash_core_components as dcc import dash_html_components as html import plotly from dash.dependencies import Input, Output from consumer import MyKafkaConnect external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css'] app = dash.Dash(__name__, external_stylesheets=external_stylesheets) app.layout = html.Div( html.Div([ html.Div([ html.H4('TERRA Satellite Live Feed'), html.Div(id='terra-text'), dcc.Graph(id='terra-graph') ], className="four columns"), html.Div([ html.H4('AQUA Satellite Live Feed'), html.Div(id='aqua-text'), dcc.Graph(id='aqua-graph') ], className="four columns"), html.Div([ html.H4('AURA Satellite Live Feed'), html.Div(id='aura-text'), dcc.Graph(id='aura-graph') ], className="four columns"), dcc.Interval( id='interval-component', interval=1*1000, # in milliseconds n_intervals=0 ) ], className="row") ) def create_graphs(topic, live_update_text, live_update_graph): connect = MyKafkaConnect(topic=topic, group='test_group') @app.callback(Output(live_update_text, 'children'), [Input('interval-component', 'n_intervals')]) def update_metrics_terra(n): lon, lat, alt = connect.get_last() print('update metrics') style = {'padding': '5px', 'fontSize': '15px'} return [ html.Span('Longitude: {0:.2f}'.format(lon), style=style), html.Span('Latitude: {0:.2f}'.format(lat), style=style), html.Span('Altitude: {0:0.2f}'.format(alt), style=style) ] # Multiple components can update everytime interval gets fired. @app.callback(Output(live_update_graph, 'figure'), [Input('interval-component', 'n_intervals')]) def update_graph_live_terra(n): # Collect some data data = connect.get_graph_data() print('Update graph, data units:', len(data['time'])) # Create the graph with subplots fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2) fig['layout']['margin'] = { 'l': 30, 'r': 10, 'b': 30, 't': 10 } fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'} fig.append_trace({ 'x': data['time'], 'y': data['Altitude'], 'name': 'Altitude', 'mode': 'lines+markers', 'type': 'scatter' }, 1, 1) fig.append_trace({ 'x': data['Longitude'], 'y': data['Latitude'], 'text': data['time'], 'name': 'Longitude vs Latitude', 'mode': 'lines+markers', 'type': 'scatter' }, 2, 1) return fig create_graphs('terra_topic', 'terra-text', 'terra-graph') create_graphs('aqua_topic', 'aqua-text', 'aqua-graph') create_graphs('aura_topic', 'aura-text', 'aura-graph') if __name__ == '__main__': app.run_server( host='0.0.0.0', port=8050, debug=True)
К плюсам можно отнести (кроме тех, что указаны во второй части):
- Изоляция ресурсов, отсутствие конфликтов библиотек различных версий и т.д. (каждый модуль приложения можно упаковать в отдельный Docker-контейнер со всем своим окружением и зависимостями).
- Быстрый и удобный запуск приложения на любом хосте.
- Лёгкая масштабируемость приложения, легко можно добавить новые контейнеры
К минусам:
- Т.к. Docker является дополнительным посредником между ОС и приложением, это приводит к увеличению нагрузки и расходу большего количества ресурсов.
На рисунке ниже показана ситуация, при которой один backend микросервис перестал отправлять данные и так как всё приложение состоит из микросервисов, то оно продолжает работать (в отличие от монолитной конструкции), более того, когда этот микросервис возобновил работу, он подключился на лету, и не потребовалось всё перезапускать.

(Не смотря на то, что один микросервис завис, остальные продолжают работать)
Выводы
В данной статье я рассмотрел, на мой взгляд наиболее очевидные (далеко не все) достоинства и недостатки каждого подхода. Кроме того, хоть внедрение Kafka и кажется дополнительным усложнением проекта, но на самом деле задачи, которые необходимо решить обычно являются типовыми, такими как непрерывное чтение данных из бд или их запись, слив данных из нескольких бд в одну и т.д. Для подобных целей нет необходимости изобретать велосипед, можно использовать готовые решения из Kafka connectors, там есть поддержка для фактически всех хоть сколько-нибудь известных бд.
Дополнительные ссылки по теме:
Python + Kafka =? / Николай Сасковец/ bitnet [Python Meetup 14.09.2019]
Николай Сасковец, Построение микросервисных систем с использованием Kafka
