Давайте представим ситуацию. Есть много сервисов (чаще всего это паттерн pub/sub), и обращаться к каждому приходится по порту. Возьмём для примера один сервис: порт приёма сообщений у него один, порт отдачи — другой. Умножаем два порта на количество сервисов — и получаем головную боль. Именно она и стала отправной точкой. Mail Pigeon (почтовый голубь) - это еще один воодушевляющий вызов для меня, для моего роста. Зачем концентрироваться на угрюмых облаках, если можно улыбнуться восходящему солнцу. К тому же ZeroMQ мне понравился с первого взгляда. Желание сделать что-то в этом направлении вызвано ещё и тем, что в своей практике я постоянно видел противоположный подход. Так что, это просто желание выговориться, что можно сделать по-другому. Желание сбросить мысленные оковы идеи, которая может днями сверлить мозг. Поэтому, когда я увидел проблему, я в тот же момент увидел решение.
Так появился Mail Pigeon — асинхронная клиент-серверная библиотека на ZeroMQ, которая позволяет сервисам адресовать сообщения друг другу по имени, а не по порту. Почтовый голубь, который доставит письмо даже в бурю. С гарантией доставки, автоматическим восстановлением после сбоев и end-to-end шифрованием.
Проблема: почему REST не всегда подходит для микросервисов
Когда несколько сервисов общаются друг с другом, чаще всего используют REST API. Это просто, понятно, а главное — привычно. Но у REST есть врождённые ограничения, которые становятся проблемой при росте системы:
Синхронность. Сервис А отправляет запрос сервису Б и ждёт ответа. Если сервис Б упал или тормозит, сервис А тоже встаёт. Каскадные отказы.
Нет гарантии доставки. Если между сервисами пропала сеть, HTTP-запрос просто упадёт с ошибкой. Данные потеряны. Повторять запрос нужно вручную.
Протокольные накладные расходы. HTTP хорош для браузера, но для машинного взаимодействия он избыточен: заголовки, статусы, keep-alive.
Мне хотелось получить нечто иное: лёгкий протокол, асинхронность, гарантию, что сообщение дойдёт, даже если получатель временно недоступен. И чтобы это работало как библиотека, а не как отдельный брокер вроде RabbitMQ, который нужно разворачивать и администрировать.
Почему ZeroMQ, а не RabbitMQ или Kafka
ZeroMQ — это не брокер сообщений в классическом смысле. Это библиотека, которая даёт вам низкоуровневые сокеты с высокоуровневыми паттернами: REQ/REP, PUB/SUB, ROUTER/DEALER и другие. Вы не поднимаете отдельный сервер — вы просто используете библиотеку в своём коде.
Для моей задачи я выбрал паттерн ROUTER/DEALER:
ROUTER — это серверная сторона. Он принимает сообщения от клиентов, знает их идентификаторы и может маршрутизировать сообщения между ними.
DEALER — это клиентская сторона. Он подключается к ROUTER и может отправлять и принимать сообщения асинхронно.
Эта связка позволяет построить брокер переадресации (резервный сервер в клиенте) прямо внутри вашего приложения. Один из клиентов становится сервером-маршрутизатором, остальные подключаются к нему. Никакого внешнего брокера не нужно.
Архитектурное решение №1: Гарантированная доставка
В ZeroMQ есть одна особенность: если клиент отправил сообщение, а получатель в этот момент отключился, сообщение просто теряется. Мне это не подходило.
Я реализовал At-Least-Once семантику — сообщение должно быть доставлено как минимум один раз. Дубли лучше, чем потеря. Как это работает:
Клиент
app1отправляет сообщение дляapp2. Сообщение уходит серверу, но параллельно сохраняется в исходящую очередь клиентаapp1.Клиент
app2получает сообщение и автоматически отправляет подтверждение (ACK) обратно.Клиент
app1получает подтверждение и только тогда удаляет сообщение из своей очереди.
Если на любом этапе что-то пошло не так (пропала сеть, упал сервер или получатель), сообщение остаётся в очереди отправителя. Когда связь восстанавливается, отправитель заново пытается его доставить.
Для тестирования такого функционала мне пришлось продумать три типа теста.
Первый тип — обычный unit тест, ничего особенного, просто тест класса. Но даже в них пришлось использовать таймоут, чтобы добиться нужного эффекта.
Второй тип — это тестирования отправки сообщений между сервисами внутри отдельных подпроцессов. Здесь я хотел увидеть, как быстро могут данные отправляться между разными сервисами — как в зашифрованном виде, так и в расшифрованном. Также в этом тесте я смотрел как могут обогащаться данные, проходя по разным сервисам, как это бывает в реальной обыденности. Причём тот подпроцесс, с которого началась отправка сообщений, должен в конце получить полную цепочку обогащённых сообщений.
Третий тип — ручное интерактивное тестирование в консольных приложениях. Здесь я проверял, что будет, если одно из консольных приложений упадёт при отправке сообщений. Также в этом тесте я смотрел, как перемещается брокер переадресации между консольными приложениями. Если в одном консольном приложении брокер переадресации падал (я просто закрывал консольное приложение), то в другом консольном приложении он должен был запускаться и продолжать переадресацию сообщений. Так доставка была гарантированной между разными консольными приложениями.
Очереди сообщений — это сложнее, чем кажется
Но гарантированная доставка — это только полдела. Вторая половина — правильно организовать хранение сообщений, пока они ждут отправки или подтверждения. И вот тут начинается самое интересное. Каждое сообщение в библиотеке проходит через четыре состояния:
queue— сообщение ждёт отправки.send_queue— сообщение отправлено, ждёт подтверждения.wait_queue— входящее сообщение, адресованное конкретному ключу. Это нужно, чтобы получать нужные ответы из разных потоков тому, кто отправил сообщение.send_waiting_queue— сообщение, которое не удалось отправить (получатель не в сети).
Управление этими состояниями вынесено в абстрактные классы BaseQueue и BaseAsyncQueue. А конкретные реализации — SimpleBox (in-memory очередь) и FilesBox (файловая очередь на диске) — лишь определяют, где физически хранятся данные.
Такой подход позволяет легко добавить очередь на Redis. Достаточно унаследоваться от BaseQueue и реализовать три метода. Вот как выглядит in-memory реализация:
from mail_pigeon.queue import BaseQueue class SimpleBox(BaseQueue): def __init__(self): super().__init__() self._simple_box = {} def _save_data(self, key: str, value: str): self._simple_box[key] = value def _read_data(self, key: str) -> str: return self._simple_box[key] def _remove_data(self, key: str): if key in self._simple_box: del self._simple_box[key]
Вся логика состояний уже есть в базовом классе.
Архитектурное решение №2: Автоматический failover
Вторая проблема, которую я хотел решить, — отказоустойчивость. Что будет, если клиент-сервер (тот самый, который ROUTER) упадёт? Вся система встанет.
Я сделал так, чтобы клиенты могли автоматически поднимать резервный сервер у себя. За это отвечает параметр is_master:
is_master=True— клиент при запуске пытается стать сервером переадресации.is_master=False— клиент никогда не становится сервером, только подключается к существующему.is_master=None— клиент проверяет, есть ли сервер. Если да — подключается. Если нет — сам становится сервером.
В коде это выглядит так:
# app1 — запускается первым, становится сервером автоматически client1 = MailClient('app1', is_master=None) client1.wait_server() # app2 — запускается вторым, подключается к серверу в app1 client2 = MailClient('app2', is_master=None) client2.wait_server() # Если app2 упадёт, app1 не заметит разрыва. Но при этом client1 заметит отключение client2. # Если упадёт app1, app2 автоматически поднимет сервер у себя.
Каждый клиент мониторит состояние сервера. Если сервер перестаёт отвечать на heartbeat, клиент переходит в режим переподключения и при необходимости запускает свой сервер. Это позволяет системе самоисцеляться без внешнего оркестратора.
Сердцебиение системы: как не потерять друг друга
Отказоустойчивость требует быстрого обнаружения проблем. Если сервер упал, клиенты должны узнать об этом как можно скорее, чтобы запустить резервный сервер и переподключиться. Если клиент отключился, сервер должен уведомить остальных о его уходе.
Первый вариант, который я реализовал, был симметричным: и клиент, и сервер отправляли друг другу ping, и каждый ждал pong от другого. Логично, правда? Двухсторонняя проверка — надёжнее.
Но на практике этот подход дал сбой. В момент переподключения или при кратковременных задержках сети возникала гонка: клиент отправлял ping серверу, сервер не успевал ответить, клиент считал сервер мёртвым и начинал переподключение. Одновременно сервер слал свой ping клиенту, не получал ответа и удалял его из списка активных. В итоге сообщения, которые ещё можно было бы доставить, терялись. Система была слишком нервной.
Пришлось переосмыслить подход. Я сделал асимметричную модель:
Сервер регулярно отправляет echo-запросы всем клиентам. Это его единственная обязанность — проверять, живы ли они. Если клиент не отвечает в течение заданного интервала, сервер считает его отключившимся и уведомляет остальных.
Клиент ничего не отправляет серверу. Он только слушает. Если заданное время от сервера ничего не приходит — значит, сервер упал или канал оборвался. Клиент переходит в режим переподключения и, если нужно, запускает резервный сервер у себя.
Это решило проблему гонки. Теперь только одна сторона инициирует проверку, а вторая только реагирует на её отсутствие. Никаких лишних переподключений, никаких потерянных сообщений из-за ложного срабатывания.
Дополнительно сервер может отправить явный сигнал NOTIFY_STOP_SERVER, когда он завершает работу штатно (а не падает). Это позволяет клиентам мгновенно понять, что это не обрыв связи, и сразу запустить резервный сервер, не дожидаясь таймаута heartbeat.
Безопасность: два уровня защиты
В распределённой системе сообщения проходят через сервер-посредник. Если не подумать о безопасности, сервер может прочитать всё, что передают клиенты. Мне это не нравилось, поэтому я реализовал два уровня защиты:
Уровень 1: CurveZMQ (шифрование канала)
Это аналог TLS, встроенный в ZeroMQ. Каждый клиент имеет пару ключей: публичный и приватный. Сервер тоже имеет свою пару. При подключении происходит аутентификация и все данные шифруются на транспортном уровне. Это защищает от прослушивания трафика между клиентом и сервером, но не защищает от чтения сообщений на самом сервере.
# app1 — становится сервером, генерирует ключи client1 = MailClient('app1', is_master=True, cert_dir='/certificate') client1.wait_server() # app2 — подключается с публичным ключом сервера client2 = MailClient('app2', is_master=False, cert_dir='/certificate2') client2.wait_server()
Уровень 2: End-to-End шифрование (HMAC)
Чтобы даже сервер не мог прочитать сообщения, предназначенные другому клиенту, я добавил сквозное шифрование на уровне клиента. Сообщение шифруется до отправки на сервер и расшифровывается после получения. Сервер видит только зашифрованный набор байтов.
from mail_pigeon.security import TypesEncryptors encript = TypesEncryptors.HMAC('общий_пароль_группы') # app1 client1 = MailClient('app1', is_master=True, encryptor=encript) # app2 client2 = MailClient('app2', is_master=False, encryptor=encript) # app1 и app2 могут общаться, их сообщения недоступны серверу.
Само шифрование реализовано на основе PBKDF2HMAC и Fernet из библиотеки cryptography. Пароль группы преобразуется в ключ, которым шифруется каждое сообщение.
class HMACEncryptor(IEncryptor): def __init__(self, secret_word: str): self.salt = b'pigeon' self.kdf = PBKDF2HMAC( algorithm=SHA256(), length=32, salt=self.salt, iterations=1000 ) self.key = base64.urlsafe_b64encode(self.kdf.derive(secret_word.encode())) self.cipher = Fernet(self.key) def encrypt(self, message: bytes) -> bytes: return self.cipher.encrypt(message) def decrypt(self, encrypted: bytes) -> bytes: return self.cipher.decrypt(encrypted)
Эти два уровня можно комбинировать: CurveZMQ защищает канал, а HMAC — содержимое. Получается двойная защита.
Я сторонник того, чтобы никто не смог прочитать личные сообщения сервиса. Я представлял ситуацию, в которой у Сервиса А потребуют данные других сервисов, а они окажутся настолько неразборчивой кашей, что станут бесполезны для дальнейшего анализа, у кого нет общего ключа группы. Сервис может нести ценную информацию, и её нужно защищать даже от самого сервера.
Отправить и забыть… или дождаться ответа?
Библиотека поддерживает два режима отправки сообщений:
Режим 1: «Отправить и забыть»
client1.send(recipient='app2', content='данные') # Всё, пошли дальше. Сообщение ушло в очередь и будет доставлено, когда получится.
Это асинхронный режим. Вы не ждёте ответа, не блокируете поток. Сообщение гарантированно дойдёт (благодаря файловой очереди и ACK), но вы об этом не узнаете.
Режим 2: «Отправить и ждать ответа»
msg = client1.send(recipient='app2', content='запрос', wait=True) # Блокируется до получения ответа от app2 print(msg.content) # ответ
Здесь клиент ждёт, пока получатель не пришлёт ответное сообщение с тем же ключом. Если получатель отключился или сервер упал, ожидание прервётся и метод вернёт None. Можно задать таймаут.
Эта гибкость позволяет использовать библиотеку и для фоновых задач (уведомления, логирование), и для request-response сценариев (запрос данных от другого сервиса).
Заключение
Mail Pigeon — это не замена RabbitMQ или Kafka. Это нишевый инструмент для случаев, когда:
Вам нужна лёгкая шина данных без разворачивания отдельного брокера.
Важна гарантированная доставка даже при обрывах связи.
Нужна автоматическая отказоустойчивость без оркестратора.
Нужно сквозное шифрование сообщений.
Этот проект дал мне:
Понимание ZeroMQ и его паттернов.
Опыт реализации семантики доставки сообщений (At-Least-Once).
Навыки проектирования отказоустойчивых систем с автоматическим восстановлением.
Практику применения криптографии в реальном проекте.
У меня немало идей по развитию этой библиотеки, и я, возможно, вернусь к ним позже. Но пока проект выполнил свою главную задачу: показал, что распределённая шина данных не обязана быть тяжёлой, сложной в развёртывании и дырявой в плане безопасности. Лёгкий почтовый голубь долетел.
А ещё — после реализации этой идеи мне действительно полегчало. Надеюсь, и вам, прочитавшим эту статью, стало немного яснее.
Ссылки
Исходный код на GitHub: github.com/AntonGlyzin/mail_pigeon
Полная документация: mail-pigeon.readthedocs.io
Пакет на PyPI:
pip install mail-pigeon
