Как стать автором
Обновить
Selectel
IT-инфраструктура для бизнеса

Решаем задачу по взаимодействию микросервисов на Python тремя способами

Уровень сложностиПростой
Время на прочтение6 мин
Количество просмотров13K

Когда речь заходит про взаимодействие микросервисов, все сразу вспоминают о сложных архитектурных паттернах, вроде Event Bus и CQRS. В этой статье я расскажу, как выполнить простенькую задачку для двух микросервисов без навороченной архитектуры. В моем случае это создание сервиса, который агрегирует события компании в единую ленту событий.

Задача


Дано два сервиса:

  • регистратор сотрудников,
  • лента событий компании.

Цель: сделать так, чтобы в ленте создавалось событие при регистрации нового сотрудника.

С подобной задачей я столкнулся при разработке корпоративного портала Selectel. Мне нужно было организовать отображение в ленте целого ряда новых событий — в их числе изменение должности или структуры команд, переименование отдела, реакция на событие (лайк или огонек).

В моем случае я сразу пришел к одному из озвученных в тексте решений. Но, поскольку мы инсценируем эту задачу на Хабре, рассмотрим возможные реализации с описание плюсов и минусов.

30 марта мы проведем митап «Типичный Python». Обсудим типизацию, новинки в SQLAlchemy и релиз Mypy 1.0. Расскажем, как разрабатываем свои продукты и ответим на вопросы.

Регистрируйтесь на живую встречу или онлайн-трансляцию.

Решение 1. Синхронное взаимодействие


Чтобы решить задачу, будем действовать постепенно. Для начала рассмотрим самый простой для нас вариант, в котором регистратор посылает POST-запрос на создание события в ленте.


Подойдет ли нам этот подход? Зависит от того, что происходит в регистраторе. Тут нужно иметь в виду, что каждое новое действие, добавленное в метод регистрации, замедляет его выполнение. Не хочется заставлять сотрудников ждать на старте их карьеры в компании. К тому же, метод будет замедляться при добавлении новых действий. Например, регистрации в разных социальных аккаунтах.

Такое взаимодействие называется синхронным. Его лучше избегать, и сделать это довольно просто. Нужно внедрить асинхронный подход!


Решение 2. Асинхронный подход


Если попытаться нагуглить «асинхронные фоновые задачи в Python», скорее всего, наткнешься на такие решения, как Dramatiq или Celery. Они довольно популярны, и не просто так. С их помощью можно быстро накодить асинхронное взаимодействие для наших сервисов, нужен только брокер сообщений.

Брокер сообщений — это отдельный сервис, который передает сообщения в один или несколько пунктов назначения. В нашем случае он нужен для того, чтобы передать команду «Отправь задачу на создание события с параметрами a и b» в Worker. Обычно брокером выступает Redis, RabbitMQ или Kafka. Хотя бывают и случаи, когда после долгих и упорных исследований используют YMQ.

Worker — это отдельная программка, которая выполняет полученные из брокера задачи. В нашем случае именно Worker будет отправлять запрос на создание события.

После внедрения изменений наша архитектура станет похожей на диаграмму ниже.


Регистратор посылает задачу на публикацию события в брокер RabbitMQ. В то же время на этот брокер подписан Worker, который ожидает задач от брокера. После получения задачи на публикацию события Worker посылает HTTP-запрос в сервис ленты.

Такое взаимодействие выглядит и работает намного лучше, чем первое, потому что регистратор не зависит от количества действий при регистрации, а сотрудник не ожидает их выполнения.

Можно остановиться и на этой реализации. Но в реальности, где нежданно-негаданно меняются требования, наше решение по-прежнему не доведено до ума.

А что если сервисов станет больше? Отправление событий по HTTP


Представим, что мы выкатили решение в прод. События при регистрации появляются в ленте, все довольны, всем все понравилось. Настолько понравилось, что сотрудники подумали и попросили: «Слушайте, а давайте добавим еще событий в ленту? Например, будем отправлять сотрудникам благодарности или информацию об изменениях в структуре компании».

Что случится с нашей архитектурой, если отправлять сообщения в ленту будет не только регистратор, но и еще несколько сервисов?


В этом случае каждый сервис будет посылать в ленту события по HTTP, а это будет необоснованно нагружать сервис. К тому же, держим в голове, что, помимо получения событий, сервису ленты нужно их еще и показывать по запросу на главной странице портала.

В принципе можно оставить все как было. Но такой подход будет хорошо работать до тех пор, пока:

  • мало событий,
  • мало сервисов, которые шлют события,
  • мало сотрудников.

Решение 3. Очередь RabbitMQ


Появляется вопрос: «Можно ли как-то избежать лишней загрузки сервиса?» Ответ: «Можно, если убрать HTTP из нашей схемы».

Действительно, зачем нам нужно отправлять запросы из Worker-ов по HTTP, когда мы можем сразу из сервисов отправлять задачи через брокер в Worker, который имеет доступ к БД ленты. А он будет эти события напрямую записывать.

После преобразований наша схема станет такой:


Как это реализовать? Создаем очередь (RabbitMQ) специально для нашего сервиса событий. Из каждого сервиса мы публикуем события в эту очередь. Отдельный Worker, в свою очередь, выполняет задачи из брокера. Если говорить терминами, то наши сервисы — это publisher-ы, а Worker ленты — это consumer.

На этом месте уже можно остановиться. Подход хороший, все работает классно, и теперь можно не беспокоиться за новые требования наших сотрудников. Осталось только понять, как все это реализовать на Python.

Реализация на Python


Раз сервисов много, будет логично сделать Worker ленты асинхронным, так как у него будет много простоев из-за ожидания ввода/вывода.

Для его реализации будем использовать библиотеку aio-pika.

async def main():
    try:
        connection = await aio_pika.connect_robust(settings.rabbitmq_dsn)  # соединение с RabbitMQ
    except exceptions.CONNECTION_EXCEPTIONS as e:
        logger.error(e.args[0])
        await asyncio.sleep(3)
        return await main()  # запускаем бесконечный цикл, пока не подключится
    async with connection:
        channel: aio_pika.abc.AbstractChannel = await connection.channel()
        queue: aio_pika.abc.AbstractQueue = await channel.declare_queue(
            settings.queue_name, durable=True
        )
        logger.info("Starting consuming")
        while True:
            try:
                await consume(queue)  # начинаем слушать очередь
            except exceptions.CONNECTION_EXCEPTIONS as e:
                logger.error(e.args[0])
                return await main()
            except Exception as e:
                logger.error(e.args[0])




if __name__ == "__main__":
    logger.info("Starting queue worker")
    asyncio.run(main(), debug=settings.app_env == AppEnvEnum.local.value)

Функция consume тоже очень простая.

async def consume(queue):
    message: aio_pika.IncomingMessage
    async for message in queue:
        async with message.process():
            context = {
                "service_name": message.app_id,
                "task_id": message.message_id,
            }
            with logger.contextualize(**context):
                logger.info("message is being processing")
                data = json.loads(message.body.decode())
                await create_new_events(data) # тут ваша бизнес логика
                logger.info("message successfully processed!")

С consumer-ом разобрались, а что с publisher-ами? Сейчас в большинстве компаний существуют как синхронные, так и асинхронные сервисы, поэтому приведем пример и того, и другого.

Для синхронных сервисов будем использовать модуль pika.

def _get_message_properties(message_id: Optional[str] = None):
    return pika.BasicProperties(
        delivery_mode=DeliveryMode.Persistent.value,
        content_type="application/json",
        content_encoding="utf-8",
        message_id=message_id or uuid4().hex,
        app_id=project.config.application_name,
    )




def _create_connection():
    parsed = urlparse(_events_feed_config["queue"]["dsn"])
    credentials = pika.PlainCredentials(
        username=parsed.username, password=parsed.password
    )
    param = pika.ConnectionParameters(
        host=parsed.hostname,
        port=parsed.port,
        virtual_host=parsed.path[1:],
        credentials=credentials,
    )
    return pika.BlockingConnection(param)




def publish_to_events_feed(data, message_id):
    with _create_connection() as connection:
        channel = connection.channel()
        properties = _get_message_properties(message_id)
        logger.info("message is publishing")
        channel.basic_publish(
            exchange="",
            routing_key=_events_feed_config["queue"]["name"],
            body=json.dumps(data).encode(),
            properties=properties,
        )
        logger.info("message successfully published")

А для асинхронных по-прежнему aio-pika.

def _create_message(data: bytes, message_id: Optional[str] = None):
    return aio_pika.Message(
        body=data,
        content_type="application/json",
        content_encoding="utf-8",
        message_id=message_id or uuid4().hex,
        delivery_mode=aio_pika.abc.DeliveryMode.PERSISTENT,
        app_id=config.app_name,
    )




async def _publish_events(data: EventListModel, message_id=None):
    connection = await aio_pika.connect_robust(config.events_feed_queue_dsn)
    async with connection:
        routing_key = config.events_feed_queue_name
        channel: aio_pika.abc.AbstractChannel = await connection.channel()
        message = _create_message(data.json().encode(), message_id)
        logger.info("message is publishing")
        await channel.default_exchange.publish(
            message,
            routing_key=routing_key,
        )
        logger.info("message successfully published")

Заключение


Когда вы решаете задачу, исходите из требований. Зачастую не нужно строить сложную архитектуру — достаточно просто сделать POST-запрос. Когда же ваши сервисы начинают глючить, подвисать либо вы заранее знаете, что нагрузка будет высокой, имеет смысл выстроить хорошее решение для пользователей.

А если у вас есть решения 4 и 5 для задачи, приходите в комментарии — обсудим!

Возможно, эти тексты тоже вас заинтересуют:

Удар, еще удар: производство ОЗУ переживает не лучшие времена. Цены падают, производство сокращается
Полезные материалы по Data Science и машинному обучению, которые помогут пройти сквозь джунгли из терминов
КПК HP iPaq, Дюма 1870 года и PCMCIA факс-модем: новые находки на испанской барахолке
Теги:
Хабы:
Всего голосов 53: ↑52 и ↓1+51
Комментарии38

Публикации

Информация

Сайт
selectel.ru
Дата регистрации
Дата основания
Численность
501–1 000 человек
Местоположение
Россия
Представитель
Влад Ефименко