Обновить

Инструменты Node.js разработчика. Протокол mqtt для работы с веб-сокетами

JavaScript *Node.JS *
Технология веб-сокет позволяет в веб-приложении или в мобильном приложении реализовать отправку сообщений с сервера на клиент, что невозможно сделать средствами REST-API. Для работы с веб-сокетами часто используют библиотеку socket.io, или же разработчики работают с нативными объектами веб-сокет браузеров. В этом сообщении я попытаюсь показать, что оба пути не решают всех проблем, и гораздо лучше использовать для работы с веб-сокетами специализированные серверы, например mqtt-сервер (раньше его назвали mqtt-брокер).

Справедливости ради, и чтобы избежать ненужных споров, замечу, что кроме mqtt-сервера может быть использован еще целый ряд серверов, например rabbitmq.

Разработка приложений с использованием веб-сокетов кажется очень простой, пока мы не сталкиваемся с реальностью, в которой часто происходят разрывы соединений. Первая задача, которую приходится решать — отслеживание разрывов соединений и их восстановление. Ситуация усложняется тем, что во время разрыва соединения и повторного соединения, клиент продолжает отправлять новые сообщения, равно как к клиенту могут поступать новые сообщения, которые имеют большую вероятность быть потерянными.

Приходится на уровне приложения отслеживать получение сообщений, и реализовывать повторную их доставку. Я хотел бы особо обратить внимание на фразу «на уровне приложения» (а хотелось бы чтобы это было на уровне протокола).

Как только мы добавили логику для отслеживания доставки сообщений — все сообщения стали приходить, но тут же выясняется, что появились дубликаты сообщений, так как сообщение могло быть получено, а подтверждение этого факта потерялось из-за разрыва соединения. И нужно еще удваивать количество программного кода для исключения дублей сообщений.

Вместе с усложнением программного кода, снижается и его эффективность, за что часто ругают библиотеку socket.io. Она, конечно, менее эффективна, чем работа с нативными веб-сокетами, в частности, из-за наличия логики восстановления соединения и подтверждения доставки сообщения (сразу замечу что логики повторной доставки в socket.io не реализовано).

Более надежным и эффективным был бы путь вынесения этой логики на уровень протокола. И такой протокол существует — это mqtt. Первая версия протокола mqtt была разработана Энди Стэнфорд-Кларком (IBM) и Арленом Ниппером (Arcom) в 1999. Спецификация MQTT 3.1.1 была стандартизирована консорциумом OASIS в 2014 году.

В протоколе mqtt есть параметр «качество сервиса» (qos), которое может принимать значения:
0 — сообщение доставляется если есть возможность;
1 — сообщение доставляется гарантированно, но могут быть дубликаты;
2 — сообщение доставляется гарантированно и гарантированно однократно.

То есть, проблему с гарантированной доставкой сообщений протокол mqtt решает, и этот вопрос снимается с повестки дня. Но не только этот вопрос.

Производительность и масштабирование.

При работе с веб-сокетами, все подключенные клиенты оставляют открытыми соединения с сервером, даже если нет реального обмена сообщениями. Эта нагрузка по своему характеру отличается от нагрузки в REST-API, которая определяется потоком запросов. Нагрузку открытыми соединениями по веб-сокетам сложно эмулировать на этапе тестирования. Поэтому, нередко делается ошибочное предположение о достаточной производительности работы приложения по количеству отправленных и полученных сообщений, без учета нагрузки на поддержание большого количества открытых соединений с клиентами.

Если мы передаем всю работу с веб-сокетами на специализированный сервер mqtt, то наше приложение на nodejs открывает всего лишь одно соединение по веб-сокету (или по tcp, т.к. mqtt поддерживает оба протокола) с mqtt-сервером, и мы можем масштабировать наше приложение, подключая к серверу mqtt несколько экземпляров nodejs.

Если исчерпаются ресурсы одного mqtt-сервера, можно организовать кластер серверов mqtt, при этом не затрагивая приложения на nodejs.

Теперь перейдем к примеру.

mqtt-сервер или брокер, как его именовали в прежних спецификациях, работает по модели отправки сообщений/подписки на сообщения. Каждое сообщение отправляется в топик. Получатель подписывается на топик сообщений. И отправитель и получатель имеют два идентификатора: clientId (идентификатор устройства) и userName (имя пользователя).

Идентификатор устройства имеет важное значение, так как он связывается с подпиской и на него будут приходить сообщения. Имя пользователя, в отличие от идентификатора устройства, не играет решающей роли в доставке сообщений, и используется для разграничения доступа к топикам.

Для работы с протоколом mqtt на стороне клиента используется библиотека github.com/eclipse/paho.mqtt.javascript. Реализаций серверов несколько, в том числе есть бесплатные. Мы в этом примере будем использовать сервер emqx, который запускается через docker-compose (см. github.com/apapacy/tut-mqtt).

Для тестирования создадим документ, в котором будем задавать clientId, userName и текст сообщения:

<script src="/paho-mqtt.js"></script>
<script src="/messages.js"></script>
<form name="sender" onsubmit="return false">
  <input type="text" name="user">
  <input type="text" name="client">
  <input type="text" name="message">
  <input type="button" onclick="connect()" value="connect">
  <input type="button" onclick="send()" value="send">
</form>


Отправка сообщений реализована в файле message.js:

var client;

var connectOptions = {
  timeout: 30,
  reconnect: true,
  cleanSession: false,
  mqttVersion: 4,
  keepAliveInterval: 10,
  onSuccess: onConnect,
  onFailure: onFailure
}

function connect() {
  try {
    client = new Paho.Client('localhost', 8083, '/mqtt', document.forms.sender.client.value);
    connectOptions.userName = document.forms.sender.user.value;
    client.connect(connectOptions);
  } catch (ex) {
    console.log(ex);
  }
}

function onConnect() {
  console.log('on connect');
  client.onMessageArrived = function(message) {
    console.log("onMessageArrived: " + message.payloadString);
  }
  client.subscribe("test", { qos: 2 });
}

function onFailure(err) {
  console.log('on failure', JSON.stringify(err));
}

function send() {
   var message = new Paho.Message(document.forms.sender.message.value);
   message.destinationName = "test";
   message.qos = 2;
   client.send(message);
}


Для проверки, откройте в браузере файл index.html, задайте clientId, userName, текст сообщения, и отправьте несколько сообщений (их можно прочитать в консоли, так как клиент отправляет сообщения в топик test, и сам же подписан на этот топик).

Теперь откройте другой браузер или другую вкладку браузера, и присоединитесь с другим (это важно) clientId. Отправьте еще несколько сообщений из первого браузера, и убедитесь что они приходят обоим клиентам, так как у них различаются clientId и они оба подписаны на топик test.

Теперь закройте второй браузер (или вторую вкладку браузера), и отправьте еще несколько сообщений. После этого, вновь откройте второй браузер, и присоединитесь с тем же clientId. Убедитесь в логах консоли, что к Вам пришли все сообщения, которые отправлялись в период, когда второй браузер (вторая вкладка) была закрыта. Так произошло, потому что:

  • При отправке сообщения задан уровень качества qos=2;
  • Вы уже присоединялись раньше к этому же топику с этим же clientId, задав qos=2;
  • В опциях соединения задан параметр cleanSession: false.

Код примера можно скачать в репозитории.

apapacy@gmail.com
29 сентября 2019 года
Теги:
Хабы:
Всего голосов 12: ↑8 и ↓4 +4
Просмотры 14K
Комментарии Комментарии 12

Минуточку внимания