PubSub в браузере с помощью вебсокетов и протокола WAMP

Изучая методы реализации real-time обновления данных в браузере, я обнаружил "WAMP" — протокол прикладного уровня для обмена сообщениями, основанный на вебсокетах.
Протокол реализует два распространенных высокоуровневых шаблона для обмена данными: PubSub и RPC (Remote Procedure Call).

Эти шаблоны многим известны и широко применяются в различных областях программирования и межпроцессного взаимодействия:

  • RPC — удаленный вызов процедур. В процессе принимают участие клиент и сервер. Первый отправляет запросы на вызов процедуры на сервере, а второй их выполняет и отправляет результат клиенту. В типичном веб-приложении это может быть, например, запрос на создание комментария или на добавление поста в избранное
  • Publish/Subscribe (PubSub) — метод обмена сообщениями, в котором клиенты «подписываются» на интересующие их события и могут сами генерировать подобные события. Рассылкой информации подписчикам занимается третья сторона — «брокер». В WAMP шаблон PubSub реализован на основе «топиков», или каналов. Например, на сайте такими каналами могут быть «комментарии», «новости», «личные сообщения».

В контексте веб-разработки наиболее интересным вариантом применения протокола WAMP является использование шаблона PubSub. С его помощью можно легко решить задачу обновления информации на открытой у пользователя странице сайта: например, чтобы отобразить только что добавленный комментарий или показать уведомление о получении нового сообщения.
Реализация WAMP существует в виде библиотек под множество языков и платформ, включая, конечно, javascript в виде проекта autobahn.

В качестве примера использования протокола попробуем разработать абстрактное веб-приложение, в котором браузер будет подписываться на канал с новыми комментариями, а сервер — их рассылать. На сервере будет работать PHP с замечательной библиотекой Ratchet, которая помимо реализации собственно вебсокетов умеет работать с протоколом WAMP.

Планируя методы взаимодействия клиента и сервера на таком сайте следует помнить, что еще существуют браузеры, не поддерживающие вебсокеты. И хотя часть проблем с ними могут решить полифиллы, 100% работы в любой среде (например, на андроиде) с их помощью добиться не удастся. Поэтому разумно, на мой взгляд, ограничить использование шаблона PubSub на клиенте лишь подпиской на события. Генерироваться же события будут сервером, получающим «олдскульные» ajax-запросы на создание нового комментария, от имени его автора. Таким образом все клиенты смогут добавлять комментарии (или, в общем случае, генерировать события), а вот получать обновления в реальном времени — только те, кто поддерживает вебсокеты.

Клиентская часть сайта.

Библиотека autobahn экспортирует в глобальную область видимости объект ab, полный список методов которого можно прочитать в документации. Нас же интересует метод connect:

ab.connect(
    //куда подключаемся
    'ws://site.com:8080',
    //коллбэк будет вызван после успешного подключения.
    //внутрь будет передан объект session,
    //содержащий информацию о соединении и методы для взаимодействия с сервером
    function (session) {
        //подпишемся на новые комментарии. вторым параметром передаем функцию-обраточик события,
        //которая будет вызвана после получения комментария.
        session.subscribe('comments', onNewComment);
    },
    //коллбэк будет вызван после потери соединения.
    //библиотека сама попытается переподключиться, если указаны соответствующие опции,
    //поэтому в обработчике события реализовывать эту логику не нужно.
    function onClose() {
        alert('Пропало соединение с сервером');
    },
    {
        //опции для переподключения к серверу
        'maxRetries': 100,
        'retryDelay': 5000
    }
);

//обработчик новых сообщений на канале comments
function onNewComment(topic, data) {
    //topic - название канала, с которого пришло сообщение
    //в data находятся данные, переданные сервером.
    //в случае с комментариями это могут быть content и author.
    console.log('новый комментарий', data.author, data.content);
}


Для простоты в качестве названия канала была выбрана строка 'comments', однако согласно спецификации протокола такое именование не является правильным. Каналы должны быть представлены в формате URI, то есть в нашем случае канал может называться site.com/comments. В свою очередь, URI каналов можно сокращать до «компактных URI» — CURIE. Более подробно эти детали описаны на странице спецификации.

Логично, что на реальном сайте пользователю не нужны сразу все новые комментарии, а нужны только те, которые появляются на текущей странице. В таком случае можно создать к примеру такой канал: site.com/comments/page/1. Разумеется, никаких ограничений на формирование URI нет: можно динамически создавать каналы с любыми параметрами, в зависимости от поставленных задач.

Серверная часть сайта.

В примере с PHP, за доставку сообщений от http-сервера до сервера, отвечающего за рассылку сообщений вебсокетам, отвечает ZMQ. При получении нового комментария сервер сохраняет его в базу данных и отправляет сообщение в очередь ZMQ, из которого он в свою очередь будет получен демоном при помощи упомянутой выше библиотеки Ratchet.
Вот как примерно выглядит реализация такой функции:
//комментарий от пользователя, полученный через ajax или обычной отправкой формы
$comment=array('author'=>'Ваня', 'content'=>'Привет, хабрахабр!');
//обрабатываем и сохраняем его...
$commentModel->save($comment);

//передаем комментарий для последующей рассылки подписанным пользователям
$loop = React\EventLoop\Factory::create();
$context = new React\ZMQ\Context($loop);
$push = $context->getSocket(\ZMQ::SOCKET_PUSH);
//для передачи сообщений ZMQ используется другой порт, отличный от того, который используют клиенты
$push->connect('tcp://127.0.0.1:8081');
//сообщение передается в виде строки json
$push->send(json_encode($comment));
//tick выполняет первую операцию в очереди.
//операция у нас только одна - отправка сообщения.
$loop->tick();


Для обработки подключений клиентов и событий от сервера ZMQ нам понадобится процесс, который будет принимать сообщения и обрабатывать их. В документации к библиотеке Ratchet уже содержатся подробные примеры. В частности, нужно обратить внимание на класс Pusher (в нашем примере я назвал его WampProcessor, что кажется более релевантным) — именно он содержит бизнес-логику приложения и отправляет сообщения подписанным на соответствующие каналы клиентам.

Код для запуска такого процесса будет примерно таким:
//websocket-сервер из библиотеки React
$loop   = React\EventLoop\Factory::create();


//processor - пользовательский обработчик подключений по WAMP, который должны написать мы сами.
//он должен реализовывать интерфейс WampServerInterface.
//в нашем примере помимо обработки клиентских подключений
//этот класс будет также принимать сообщения от нашего http-сервера.
$processor = new WampProcessor();

//будем слушать сообщения от http-сервера через ZMQ на порту 8081...
$context = new React\ZMQ\Context($loop);
$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:8081');
//при получении собщений они будут переданы в метод 'onComment' объекта 'processor'
$pull->on('message', array($processor, 'onComment'));

//будем принимать подключения от клиентов - браузеров на порт 8080 с любого IP
$app = new \components\SocketServer\App('site.com', 8080, '0.0.0.0', $loop);
//при желании одним демоном можно обслуживать несколько приложений или сайтов сразу,
//для этого запросы можно маршрутизировать (Ratchet использует роутер из Symfony).
//в нашем примере будет один сайт, поэтому маршрутизация не понадобится.
//в качестве "контроллера" будет выступать наш класс WampProcessor.
$app->route('/', $processor, array('*'));

//...
$app->run();


Все методы класса WampProcessor будут практически идентичны тем, что можно увидеть в документации Ratchet; из них стоит выделить только обработчик события — метод "onComment":
/**
 * @param string строка в JSON, полученная от ZeroMQ
 */
public function onComment($json) {
    $comment = json_decode($json, true);

    //ничего не делаем, если нет ни одного подписчика на новые комментарии
    if (!array_key_exists('comments', $this->subscribedTopics)) {
        return;
    }

    $topic = $this->subscribedTopics['comments'];

    //иначе отправляем комментарий всем подписанным клиентам.
    $topic->broadcast($comment);
}


Таким образом при создании нового комментария все подключенные браузеры будут получать объект с полями author и content, который и ожидает получить javascript-обработчик.

За процессом обмена сообщениями можно наблюдать в консоли chrome (фильтр «websocket» во вкладке «network») или другого браузера. Видно, что при подключении к серверу браузер отправляет приветственное сообщение, а потом — список каналов для подписки.

Заключение.

Вот так, применив технологию WebSockets и протокол WAMP, можно реализовать обновление информации на веб-странице в реальном времени методом PubSub.
Можно возразить, что используя nodejs и библиотеку socket.io сделать это было бы проще, но в нашей реальности, где PHP является доминирующей серверной платформой, описанный вариант вполне жизнеспособен и даже более удобен чем другие, более «костыльные» методы (как, например, периодический опрос сервера с помощью ajax). Также его относительно легко можно внедрить на существующий сайт: изменения потребуется внести только в те части, где происходит генерация каких-либо событий, а сам демон-обработчик от сайта может быть совершенно независим.

Ключевые ссылки:

  • Wamp — websocket application messaging protocol.
  • Authobahn — реализация WAMP на javascript.
  • Ratchet — реализация вебсокетов и WAMP на PHP.
AdBlock has stolen the banner, but banners are not teeth — they will be back

More
Ads

Comments 7

    0
    А есть примеры использования на реальных проектах на PHP? Как с аппетитами до ресурсов обстоят дела?
      0
      Я на сколько-нибудь серьезной нагрузке пока не использовал (и скорее всего не придется, масштабы проектов не те), поэтому судить сложно.
      Вот тут разработчики Ratchet приводят советы по повышению производительности: socketo.me/docs/deploy
      А тут www.cargomedia.ch/websocket/benchmarking-websocket-servers/ сравнивают производительность Ratchet и аналогичного решения на nodejs; первый оказывается даже немного быстрее, правда расход памяти там почему-то не сравнивается.
      0
      stomp.github.io/ и amp-protocol.net/ походу подобны
        0
        Пока не дочитал до конца, в голове была только одна мысль – этого же можно добиться и с помощью socket.io на обеих сторонах. Но все равно, спасибо за статью!
          0
          Что-то непонятно, применим ли к урлам топиков концепт «коллекций».

          «В таком случае можно создать к примеру такой канал: site.com/comments/page/1»

          Где в спецификации обозначено что подписка по такому урлу получит публикацию на урл «site.com/comments/»?
            0
            Если я правильно понял, вы хотите, подписавшись на /comments/page/1, получать также обновления с канала /comments?
            Не совсем очевидно, почему так должны себя вести каналы. Этот вопрос в спецификации не рассмотрен, но при желании можно реализовать такое поведение самому.
              0
              Потомучто URLы в качестве идентификаторов предполагают иерархичность.
              И именно иерархичность указана в качестве причины выбора урлов.
              Если же сами каналы не иерархичные, то нет никакого смысла использовать urlов вместо рандомных строк.

              Доделать такое поведение, конечно можно.
              Для этого нужно переписать и пропатчить код клиента и сервера, тоесть изобрести свой протокол. При этом теряется смысл использования WAMP вместо самодельного велосипеда.

          Only users with full accounts can post comments. Log in, please.