Инструменты Node.js разработчика. Протокол mqtt для работы с веб-сокетами

    Технология веб-сокет позволяет в веб-приложении или в мобильном приложении реализовать отправку сообщений с сервера на клиент, что невозможно сделать средствами REST-API. Для работы с веб-сокетами часто используют библиотеку socket.io, или же разработчики работают с нативными объектами веб-сокет браузеров. В этом сообщении я попытаюсь показать, что оба пути не решают всех проблем, и гораздо лучше использовать для работы с веб-сокетами специализированные серверы, например mqtt-сервер (раньше его назвали mqtt-брокер).

    Справедливости ради, и чтобы избежать ненужных споров, замечу, что кроме mqtt-сервера может быть использован еще целый ряд серверов, например rabbitmq.

    Разработка приложений с использованием веб-сокетов кажется очень простой, пока мы не сталкиваемся с реальностью, в которой часто происходят разрывы соединений. Первая задача, которую приходится решать — отслеживание разрывов соединений и их восстановление. Ситуация усложняется тем, что во время разрыва соединения и повторного соединения, клиент продолжает отправлять новые сообщения, равно как к клиенту могут поступать новые сообщения, которые имеют большую вероятность быть потерянными.

    Приходится на уровне приложения отслеживать получение сообщений, и реализовывать повторную их доставку. Я хотел бы особо обратить внимание на фразу «на уровне приложения» (а хотелось бы чтобы это было на уровне протокола).

    Как только мы добавили логику для отслеживания доставки сообщений — все сообщения стали приходить, но тут же выясняется, что появились дубликаты сообщений, так как сообщение могло быть получено, а подтверждение этого факта потерялось из-за разрыва соединения. И нужно еще удваивать количество программного кода для исключения дублей сообщений.

    Вместе с усложнением программного кода, снижается и его эффективность, за что часто ругают библиотеку socket.io. Она, конечно, менее эффективна, чем работа с нативными веб-сокетами, в частности, из-за наличия логики восстановления соединения и подтверждения доставки сообщения (сразу замечу что логики повторной доставки в socket.io не реализовано).

    Более надежным и эффективным был бы путь вынесения этой логики на уровень протокола. И такой протокол существует — это mqtt. Первая версия протокола mqtt была разработана Энди Стэнфорд-Кларком (IBM) и Арленом Ниппером (Arcom) в 1999. Спецификация MQTT 3.1.1 была стандартизирована консорциумом OASIS в 2014 году.

    В протоколе mqtt есть параметр «качество сервиса» (qos), которое может принимать значения:
    0 — сообщение доставляется если есть возможность;
    1 — сообщение доставляется гарантированно, но могут быть дубликаты;
    2 — сообщение доставляется гарантированно и гарантированно однократно.

    То есть, проблему с гарантированной доставкой сообщений протокол mqtt решает, и этот вопрос снимается с повестки дня. Но не только этот вопрос.

    Производительность и масштабирование.

    При работе с веб-сокетами, все подключенные клиенты оставляют открытыми соединения с сервером, даже если нет реального обмена сообщениями. Эта нагрузка по своему характеру отличается от нагрузки в REST-API, которая определяется потоком запросов. Нагрузку открытыми соединениями по веб-сокетам сложно эмулировать на этапе тестирования. Поэтому, нередко делается ошибочное предположение о достаточной производительности работы приложения по количеству отправленных и полученных сообщений, без учета нагрузки на поддержание большого количества открытых соединений с клиентами.

    Если мы передаем всю работу с веб-сокетами на специализированный сервер mqtt, то наше приложение на nodejs открывает всего лишь одно соединение по веб-сокету (или по tcp, т.к. mqtt поддерживает оба протокола) с mqtt-сервером, и мы можем масштабировать наше приложение, подключая к серверу mqtt несколько экземпляров nodejs.

    Если исчерпаются ресурсы одного mqtt-сервера, можно организовать кластер серверов mqtt, при этом не затрагивая приложения на nodejs.

    Теперь перейдем к примеру.

    mqtt-сервер или брокер, как его именовали в прежних спецификациях, работает по модели отправки сообщений/подписки на сообщения. Каждое сообщение отправляется в топик. Получатель подписывается на топик сообщений. И отправитель и получатель имеют два идентификатора: clientId (идентификатор устройства) и userName (имя пользователя).

    Идентификатор устройства имеет важное значение, так как он связывается с подпиской и на него будут приходить сообщения. Имя пользователя, в отличие от идентификатора устройства, не играет решающей роли в доставке сообщений, и используется для разграничения доступа к топикам.

    Для работы с протоколом mqtt на стороне клиента используется библиотека github.com/eclipse/paho.mqtt.javascript. Реализаций серверов несколько, в том числе есть бесплатные. Мы в этом примере будем использовать сервер emqx, который запускается через docker-compose (см. github.com/apapacy/tut-mqtt).

    Для тестирования создадим документ, в котором будем задавать clientId, userName и текст сообщения:

    <script src="/paho-mqtt.js"></script>
    <script src="/messages.js"></script>
    <form name="sender" onsubmit="return false">
      <input type="text" name="user">
      <input type="text" name="client">
      <input type="text" name="message">
      <input type="button" onclick="connect()" value="connect">
      <input type="button" onclick="send()" value="send">
    </form>
    


    Отправка сообщений реализована в файле message.js:

    var client;
    
    var connectOptions = {
      timeout: 30,
      reconnect: true,
      cleanSession: false,
      mqttVersion: 4,
      keepAliveInterval: 10,
      onSuccess: onConnect,
      onFailure: onFailure
    }
    
    function connect() {
      try {
        client = new Paho.Client('localhost', 8083, '/mqtt', document.forms.sender.client.value);
        connectOptions.userName = document.forms.sender.user.value;
        client.connect(connectOptions);
      } catch (ex) {
        console.log(ex);
      }
    }
    
    function onConnect() {
      console.log('on connect');
      client.onMessageArrived = function(message) {
        console.log("onMessageArrived: " + message.payloadString);
      }
      client.subscribe("test", { qos: 2 });
    }
    
    function onFailure(err) {
      console.log('on failure', JSON.stringify(err));
    }
    
    function send() {
       var message = new Paho.Message(document.forms.sender.message.value);
       message.destinationName = "test";
       message.qos = 2;
       client.send(message);
    }
    


    Для проверки, откройте в браузере файл index.html, задайте clientId, userName, текст сообщения, и отправьте несколько сообщений (их можно прочитать в консоли, так как клиент отправляет сообщения в топик test, и сам же подписан на этот топик).

    Теперь откройте другой браузер или другую вкладку браузера, и присоединитесь с другим (это важно) clientId. Отправьте еще несколько сообщений из первого браузера, и убедитесь что они приходят обоим клиентам, так как у них различаются clientId и они оба подписаны на топик test.

    Теперь закройте второй браузер (или вторую вкладку браузера), и отправьте еще несколько сообщений. После этого, вновь откройте второй браузер, и присоединитесь с тем же clientId. Убедитесь в логах консоли, что к Вам пришли все сообщения, которые отправлялись в период, когда второй браузер (вторая вкладка) была закрыта. Так произошло, потому что:

    • При отправке сообщения задан уровень качества qos=2;
    • Вы уже присоединялись раньше к этому же топику с этим же clientId, задав qos=2;
    • В опциях соединения задан параметр cleanSession: false.

    Код примера можно скачать в репозитории.

    apapacy@gmail.com
    29 сентября 2019 года

    Похожие публикации

    Средняя зарплата в IT

    113 000 ₽/мес.
    Средняя зарплата по всем IT-специализациям на основании 5 381 анкеты, за 2-ое пол. 2020 года Узнать свою зарплату
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 12

      +1
      Спасибо за статью!
      Стоило бы добавить, что описанный механизм по доставке недополученных сообщений после восстановления соединения обеспечивается именно параметром cleanSession: false. Кроме того, не каждая реализация MQTT-брокера поддерживает такие не-clean-сессии.
      А еще есть активно развивающаяся альтернатива библиотеке Paho mqtt клиента для браузера и nodejs — github.com/mqttjs/MQTT.js
        0

        Библиотеку mqtt.js я использую на стороне сервера т.к. она поддерживает протокол tcp. Однако на стороне клиента который я разрабатывал на react-native я столкнулся с такой проблемой. После перехода мобильного устройства в состояние ожидания происходит закрытие всех веб-сокетов. И если так устройство оставить не нескол ко часов а потом открыть то сообщение теряется. В чем причина я не стал разбираться. Возможно не происходит реконнект или реконнект происходит с очисткой или как-то некорректно возобновляется подписка после чего сообщения получаются до восстановления подписки.
        Но на сервере я и пользую именно ее чтобы использовать tcp


        По cleanSession все основные реализации поддерживают хранение сообщений. Потому что это требование протокола. И по протоколу такое хранение даже не имеет срока. Поэтому исходя из соображений эффективности все основные реализации брокера вводят срок хранения и количество хранимых сообщений. В некоторых серверах в дефолтных конфигах хранение выключено. В некоторых срок хранения задан в несколько секунд. В приложенном к тексту репозитории git есть конфиги для трёх наиболее продвинутых бесплатных серверов.


        Другое дело что не все сервера поддерживают состояние после перегрузки сервера. В частности тот сервер который я привел в примере поддерживает хранения после перегрузки только в платной версии.

          0
          После перехода мобильного устройства в состояние ожидания происходит закрытие всех веб-сокетов.

          Да, это как раз то, для чего придумывались не-clean-сессии в MQTT.

          А ещё, если брокер поддерживает retain сообщения — можно хранить в них состояние приложения, и оно будет автоматически синхронизироваться между всеми клиентами. Получится этакая облачная БД для приложения.

          И не смотрели ли Вы в сторону следующей версии стандарта, MQTT 5.0?
          Там как раз добавлено время хранения не-clean-сессии и сообщений, произвольные user property сообщений, shared subscriptions для балансировки нагрузки брокером, и много чего ещё.
          Жаль только пока далеко не все брокеры поддерживают этот стандарт, и в основном это облачные решения (SaaS)
            0

            По поводу не-clean-сессии в MQTT — почему-то именно в бибиотеке mqtt.js эта часть не работает после длительного отключения девайса, не выяснил пока, почему.


            По поводу 5-й версии наиболее полезным кажется свойство shared subscriptions, которого не хватало в первых версиях. И это свойство поддерживается основными бесплатными игроками, правда не всегда одинаковыми и стандартными средствами (например название очереди для таких сообщений должно иметь определенный префикс)


            Единственное, чего я не могу понять — это полнейшее отсутствие интереса у среднего разработчика к этой технологии. Ни информация о том что эту технологии использует телега и ФБ, ни перечень решаемых проблем не может остановить среднего разработчика в 1001 раз велосипедить свою обвязку вокруг socket.io для реализации чата, который в результате работает глючно, т.к. уровень этих средних разработчиков весьма далек от уровня разработчиков протокола и серверов mqtt.

              0

              Думаю, всё дело в том, что средний разработчик представляет себе включение в стек дополнительного сервера, как что-то в первую очередь отягощающее ситуацию. И уж точно это решение не для очередного чатика внутри того или иного проекта уровня среднего разработчика.

                0

                Дело в том что подключить mqtt сервер к проекту это дело нескольких часов. А запилить свою реализацию на советах это с учётом тестирования и голова багов типа пропавших или дублирующих сообщений это не один челлвеко мнсяц работы на базе и на фронтенде

        +2
          +1

          Интересное решение. Хотелось бы сравнить работу на реальных приложениях и нагрузках. По функционалу очень близко к возможностям mqtt. Правда по части восстановления пропущенных сообщений в документации есть фраза что реализация может меняться (наверное они еще в поиске).


          Также дополнительным плюсом mqtt является то, что два из трех популярных серверов mqtt используют erlang-машину, что позволяет надеятся на то что эти реализации будут держать миллионы коннектов без большитх накладных расходов. В то время как centrifugo заявляет, что " Generally we suggest to not put more than 50-100k clients on one node" что примерно соответвует возхможностям инстанса mqtt-сервера mosquito, рабраотанного на С.


          Однако чтобы убедиться в этом нужно иметь себе такую аудиторию приложений.

            +2
            Мне centrifugo нравится из-за удобства интеграции и использования + производительность.
            В паре с Redis скейлятся легко, без танцев с бубном.
            Ещё попробовал NATS и NATS Streaming, использовал как транспорт в go-micro.
            Чистый NATS — at most once гарантии, мега быстрый «message/event spitter»
            NATS Streaming — тут уже at least once (настраиваемо, можно нескольким рассылать, можно слать ACK в ответ), больше похож на сервер очередей, но не такой громозкий как RabbitMQ.

            Что натс, что центрифуга — относительно новые, но стабильные решения. Автор центрифуги есть на хабре — FZambia, есть телеграм канал, прекрасная поддержка.
            Оба решения — Golang, если не ошибаюсь центрифуга была на питоне изначально, NATS вообще проект CNCF — создан для микросервисов в облаках под нагрузками.

            Про MQTT ничего не могу сказать, не было опыта в использовании. Но попробовав NATS и Centrifugo — для меня нет смысла использовать другие транспорты сообщений.

            А Erlang лично для меня — скорее боль, чем плюс, это после RabbitMQ =), возможно я предвзят.

            Upd:
            Прекрасный Realtime API для NATS: Resgate
              0

              Конечно erlang по производительности не самый быстрый. Но тут весь вопрос что понимать под производительностью. Для серверов которые должны держать миллионы коннектов с мобильными девайсами органичение не в скорости, а в колчиестве одновременных коннектов. И erlang возможно с этим справится лучше, хотя для того чтобы говорить уверенно нужно иметь такую миллионную аудиторию, которой у меня просто нет.


              Если исходить из того, что один сервер на erlang по количеству коннектов равен 10 серверам не на erlang, то при масштабировании через redis 10 серверов будет проседать и их производительность.


              Хотя опять же это чисто все гипотетическая проблема, если нет такой аудитории.


              Так что я голосую за любое специализированное решение и против любого велосипедостроения в данной области.

          +1

          Вроде и красиво написано, хорошая идея, но выглядит, как попытка натянуть сову на глобус.


          Во-первых, недовереннве клиенты подключаются к MQTT брокеру, верно?
          Что случится, если кто-то левый подключится туда же?


          Во-вторых, как правильно подметили — брокер может и не сохранить очередь при рестарте.


          В-третьих, совсем не освящён вопрос "что будет с сервером, если к нему единожды подключился клиент и больше не подключался"? Через какое время сервер начнёт терять сообщения в очереди или навернётся от переполнения очереди — через час, неделю, месяц, год...? Как чистить такие очереди?


          Мне кажется, что для таких целей лучше использовать именно сервера очередей типа RabbitMQ и вручную (из приложения) управлять юрезами/сессиями.

            0

            Ответы на вышеизложенные запросы:
            Цель статьи показать что использование mqtt-сервера так же просто и естественно для обмена сообщениями как использование redis для хранение key/value, т.к. именно для этого оба и были разработаны.
            И по каждому замечанию:
            1) За аутентификацию клиентов отвечает параметр userName. В репозитарии есть конфиги по настройке авторизации при помощи сервера redis (она отключена но включается одним параметром).
            2) Сохранение и не сохранение очереди при рестарте задается также конфигами и как правило реализовано. Например основные бесплатные решения


            • https://mosquitto.org — реализовано в свободной версии
            • https://www.emqx.io — есть бесплатная версия, сохранение очередей при рестарте реализовано только в платной версии
            • https://vernemq.com — реализовано в свободной версии.
              3) Стандарт предусматривает бессрочное хранение сообщений. Очередь очищается по стандарту со стороны клиента при соединении с параметром clearSession=true. Однако из соображений эффективности реализации серверы-брокеры устанавливают нестандартные кофнигурационные параметры для ограничения срока харанения сообщений и количества сообщений. В репозитарии этой статьи есть конфиги для трех упомянутых в пункте 2 серверов с заданными ограничениями.

            По rabbitmq я сразу в начале своего сообщения сказал что можно использовать его или любоой другой сервер поддерживающий например amqp протокол. В частности в rabbitmq есть плагин который поддерживает и mqtt протокол с некоторыми ограничениями. основная идея статьи — это использовать для обмена отдельный специализированый сервер а не пытаться соорудить надстройку над веб-сокетами.

          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

          Самое читаемое