
Одним прекрасным утром к нам в офис забежал молодой парень, с амбициозной идеей и “средствами для реализации” в кармане. “Заходишь на сайт, а там — телевизор. К нему можно подключиться через свою web-камеру. Одновременно может вещать только один человек, остальные — ждут своей очереди (но можно посмотреть скриншоты с их вебкамер). Задача каждого — удержаться в эфире, как можно дольше. Если выступающий нравится публике — все жмут “Cool!”, если подкачал — “Go away!”. И человек заменяется на следующего в очереди. Ну и можно в чат писать”.
Хорошая идея — драйвовый проект. Рисуем прототип, решаем реализовать обновление чата, списка пользователей, рейтинга и т.д. с помощью push-технологии. Это когда после загрузки страницы соединение между клиентом и сервером не закрывается, а продолжает использоваться для отправки сервером каких-либо событий на клиента.
Осторожно! Эта шняга может убить ваш сервер! Кстати, если вы вдруг решите написать высконагруженный скандинавский аукцион – истина и веселые картинки где-то рядом, под катом.
Пара слов про PUSH
Использование технологии PUSH оправдано там, где нужно очень быстро обновлять множество клиентов при определенных изменениях на стороне сервера. В общем случае общение клиента и сервера выглядит так

Очевидно, что на каждого клиента нам нужен один постоянно работающий серверный push-процесс, который называется Comet (Комета). Это накладно с точки зрения использования памяти сервера, но альтернатива этому: постоянные запросы к серверу — на порядок хуже, медленнее и тяжелее. Правда есть еще один хороший вариант — работать напрямую с сокетами, но это отдельная тема.
Процесс работы
Когда клиент совершает какое-то действие (например, пишет в чат или голосует), он отправляет на сервер ajax-запрос с соответствующей командой. Результат выполнения этой команды нужно разослать всем подключенным клиентам. Для этого, скрипт, занимающийся обработкой ajax-запроса, должен оповестить все Кометы, которые уже передадут команду на обновление своим клиентам, а те в свою очередь — аккуратно отрисуют изменение.
Не стоит говорить, что одновременно может поступать довольно много запросов на обновление. Для этого должна быть организована очередь таких событий, из которой Кометы смогут брать события и рассылать их дальше.
Примерно так выглядит происходящее, когда мы чего-то написали в чат.

Ключевых моментов в этой схеме два:
- Реализация кометы (хороший обзор предоставил пользователь balepc)
- Реализация очереди. Давайте остановимся на них более подробно.
Реализация очереди
Для нашего проекта было необходимо, чтобы:
- Кометы мгновенно «просыпались» при поступлении новых событий в очередь, а не опрашивали ее постоянно. Это позволяет значительно снизить нагрузку на сервер, а так же реализовать мгновенную реакцию системы на действия пользователя.
- Если Комета по какой-то причине умирает (например, пользователь обновил страницу), пользователю все равно должны дойти все события, которые произошли между моментом отмирания Кометы и моментом восстановления соединения. Паузы в отправке сообщений так же возникают, если Push-канал работает через технологии “long polling” (характерно, например, для браузера Opera). В этом случае push-соединение создается заново каждый раз, когда клиент получает сообщение от сервера.

- Когда у нас на сайте сидит 5 человек, то голос каждого из них должен оказывать большее влияние, чем когда у нас на сайте сидит 100 человек.
- Пользователи, отключившие камеры, должны уходить из блока скриншотов.

В этом случае нам нужно достоверно знать, сколько и каких пользователей сейчас on-line. Дело в том, что после закрытия пользователем окна браузера на сервере не происходит никаких событий, поэтому отследить этот момент невозможно. И мы пришли к выводу о необходимости того, чтобы комета просыпалась и отчитывалась с определенной периодичностью (скажем, раз в пять секунд) о том, что она жива.
- Каждое посылаемое в очередь событие доходило до всех пользователей, независимо от качества их канала.
В простых случаях взаимодействия между двумя процессами можно воспользоваться функциями работы с очередями, встроенными в PHP, либо обертку Zend_Queue. Однако они не обеспечивают рассылку события сразу всем слушающим процессам (кометам).
Экспериментируем с очередями
Один из возможных вариантов решения — менеджер очередей.
Способ реализации:
- Каждая Комета поднимает свою очередь.
- Идентификатор новой очереди заносится в общую для всех процессов память (shared memory).
- Для предотвращения одновременной работы нескольких процессов с очередями используется семафор.
- При отправке сообщения в очередь событие рассылается в очередь каждого процесса.
Примерно так:

// когда мы нарисовали эту картинку на клочке бумажки — нам стало уже страшно, но отважный велосипедостроитель может попытаться довести реализацию до конца.
Даже если мы сделаем такой менеджер, то кометы будут просыпаться только при поступлении новых событий, а нам этого явно недостаточно (помните про необходимость знать, кто сейчас живой?).
К сожалению, стандартными функциями php сделать очередь с таймаутами невозможно. В голову пришла мысль реализовать какой-то сторонний серверный процесс, который бы периодически «пробуждал» кометы, подбрасывая события в очередь. Кстати, этот же процесс может так же удалять умершие идентификаторы очередей из shared memory. Вот что получаем в итоге:

При взгляде на эту картинку невольно вспоминается высказывание Луговского
«Человеческая фантазия безгранична на идиотские и нелепые архитектуры...»
Начинаем анализировать существующие решения для организации очередей с точки зрения пригодности использования в нашем приложении.

Наш выбор пал на Apache ActiveMQ (реально выбирали между ним и RabbitMQ)
Плюсы:
- Очень быстрый
- Очень стабильный
- Поддержка нескольких протоколов, в том числе — stomp
- Есть реализация взаимодействия в ZendFramework
- Есть способ рассылки сообщений всем слушателям (так называемые Topics)
- Умеет пробуждать ожидающий процесс по таймауту
- Хорошо масштабируется
Минус: Не умеет возвращать количество сообщений в очереди
Более подробный обзор менеджеров очередей от хабраюзера aleks_raiden можно найти здесь
http://habrahabr.ru/blogs/hi/44907/
http://habrahabr.ru/blogs/hi/45891/
Единственное, что нельзя было реализовать напрямую, используя механизм Topics — это восстановление событий, которые могли произойти при обрыве соединения кометы, до того, как она поднялась (это можно было сделать, отказавшись от механизма Topics, и используя индивидуальные очереди на каждый процесс, но...)
Мы решили складывать свежие N штук событий в shared memory, чтобы комета смогла мгновенно восстановить свой контекст. В итоге архитектура получилась следующая:

Бренчмарки
Для реализации приложения мы использовали ZendFramework. Ajax-запросы поступали в соответствующие контроллеры, которые обрабатывали их, и складывали сообщения в очередь, откуда кометы забирали сообщения и рассылали их на клиентов. Сами кометы так же реализованы как Action внутри контроллера ZendFramework.
Запустив Apache Branchmark, мы с ужасом обнаружили, что система активно кушает процессор уже при 25 одновременных обращениях.

300 запросов на 25 одновременных потоков
Как выяснилось, 99% времени занимала инициализация ZendFramework и соединение с базой данных. О-опс. Ну, это старая новость, и лекарство от этого известно — нужно исключить подключение классов ZendFramework, кроме реально необходимых. А все необходимые классы объединить в один файл.
Мы пришли к выводу, что придется отказаться от использования каркаса MVC при обработке ajax-запросов на голосование и чат, а так же в реализации кометы (поскольку скорость ее старта очень критична для браузеров, поддерживающих только механизм Long poll). Так же мы избавились от каких-либо запросов к базе данных в процессах обработки ajax-запросов. Для чего мы вынесли все необходимые данные в shared memory. При необходимости же синхронизации shared memory и базы данных — выполняли синхронизацию сторонним процессом.
Итог

300 запросов на 100 одновременных потоков
Реакция на события от клиента стала практически мгновенной. Ура, победа! Легковесные кометы используют память и процессор очень экономно. Узкое горлышко приложения — раздача видео-потока (но если вы будете делать скандинавский аукцион, где эффективно применять схожую архитектуру очередей — это не ваша проблема ;-), возможно напишем про это в следующих статьях, если выдержим хабраэффект.
Ниже опубликован наш менеджер очередей.
<?php require_once 'Zend/Queue.php'; require_once 'Sibirix/Queue/Smemory/Package.php'; /** * Менеджер очередей * */ class Sibirix_Queue_Smemory_Manager { const ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE = "BROKEN_OPTIONS_FOR_INIT"; const ERROR_UNKNOWN_IDENTIFIER_STORAGE_MESSAGE = "UNKNOWN_IDENTIFIER_STORAGE"; const ERROR_CAN_NOT_ACTIVATE_THE_SEMAPHORE_MESSAGE = "CAN_NOT_ACTIVATE_THE_SEMAPHORE"; const SHM_VAR_LAST_CHAT_POSTS = 1; const SHM_VAR_LAST_SYSTEM_COMMAND = 2; const SHM_VAR_QUEUE_LAST_ID = 3; private $_count_chat_package_storage; private $_count_system_package_storage; private $_activeMQoptions; private $_queue; /** * semaphore key */ private $_sem_key; /** * Shared memory APP_KEY = one symbol */ private $_app_key; private $_shared_memory_id = false; private $_shared_memory_handler = false; private $_shared_memory_size; private $_sem_id = false; private $_last_queue_message_id = false; public function __construct($options = array()) { if (!is_array($options)) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['sem_key'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['app_key'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } else { $options['app_key'] = (string)$options['app_key']; if (strlen($options['app_key']) != 1) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } } if (!isset($options['activeMQ'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['shared_memory_size'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['count_chat_package_storage'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } if (!isset($options['count_system_package_storage'])) { throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE); } $this->_sem_key = $options['sem_key']; $this->_app_key = $options['app_key']; $this->_activeMQoptions = $options['activeMQ']; $this->_shared_memory_size = $options['shared_memory_size']; $this->_count_chat_package_storage = $options['count_chat_package_storage']; $this->_count_system_package_storage = $options['count_system_package_storage']; $this->_sem_id = sem_get($this->_sem_key, 1); $this->_shared_memory_id = ftok(".", $this->_app_key); $this->_shared_memory_handler = shm_attach($this->_shared_memory_id, $this->_shared_memory_size); } public function connectTopic() { if (!$this->_queue) { $this->_queue = new Zend_Queue('Activemq', $this->_activeMQoptions); } return $this->_queue; } public function receive() { $this->connectTopic(); $message = false; $msg_receive = $this->_queue->receive(1); foreach ( $msg_receive as $msg ) { $pkg = new Sibirix_Queue_Smemory_Package($msg->body); $this->_last_queue_message_id = $pkg->id; $message = $pkg; } return $message; } public function receiveAll() { $this->connectTopic(); $last_queue_message_id_mem = $this->_sem_getLastQueueMessageId(); $messages = array(); while(true) { $msg_receive = $this->_queue->receive(1); if (!$msg_receive->count()) {break;} foreach ( $msg_receive as $message ) { $pkg = new Sibirix_Queue_Smemory_Package($message->body); $this->_last_queue_message_id = $pkg->id; $messages[] = $pkg; } if ($this->_last_queue_message_id >= $last_queue_message_id_mem ) { $last_queue_message_id_mem = $this->_sem_getLastQueueMessageId(); if ($this->_last_queue_message_id >= $last_queue_message_id_mem ) { break; } } } return $messages; } public function send($action, $data, $recipient = false) { $this->connectTopic(); $package = new Sibirix_Queue_Smemory_Package($action, $data, $recipient); $this->_lock(); $last_queue_message_id_mem = $this->_getLastQueueMessageId(); $last_queue_message_id_mem++; $this->_setLastQueueMessageId($last_queue_message_id_mem); $this->_queue->send( $package->normalize( $last_queue_message_id_mem ) ); if ( $package->isPost() ) { $this->_savePackageToChatStorage( $package ); } else { $this->_savePackageToSystemStorage( $package ); } $this->_unlock(); return $last_queue_message_id_mem; } private function _getLastQueueMessageId() { $last_queue_message_id_mem = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID); if (!$last_queue_message_id_mem) { shm_put_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID, 0); $last_queue_message_id_mem = 0; } return $last_queue_message_id_mem; } private function _setLastQueueMessageId($id) { shm_put_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID, $id); return true; } private function _sem_getLastQueueMessageId() { $this->_lock(); $last_queue_message_id_mem = $this->_getLastQueueMessageId(); $this->_unlock(); return $last_queue_message_id_mem; } private function _savePackageToChatStorage( $package ) { $last_chat_posts = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS); if (!$last_chat_posts) {$last_chat_posts=array();} if (count($last_chat_posts) >= $this->_count_chat_package_storage) { while(count($last_chat_posts) >= $this->_count_chat_package_storage) { array_shift($last_chat_posts); } } array_push($last_chat_posts, $package->normalize()); shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS, $last_chat_posts); } private function _savePackageToSystemStorage($package) { $last_sys_cmd = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND); if (!$last_sys_cmd) {$last_sys_cmd=array();} if (count($last_sys_cmd) >= $this->_count_system_package_storage) { while(count($last_sys_cmd) >= $this->_count_system_package_storage) { array_shift($last_sys_cmd); } } array_push($last_sys_cmd, $package->normalize()); shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND, $last_sys_cmd); } public function getLastPackage($storage) { if ( $storage != self::SHM_VAR_LAST_SYSTEM_COMMAND && $storage != self::SHM_VAR_LAST_CHAT_POSTS ) { throw new Exception(self::ERROR_UNKNOWN_IDENTIFIER_STORAGE_MESSAGE); } $this->_lock(); $last_packages = shm_get_var($this->_shared_memory_handler, $storage); if (!$last_packages) {$last_packages=array();} $this->_unlock(); $return_last_packages = array(); foreach($last_packages as $normalize_package) { $package = new Sibirix_Queue_Smemory_Package($normalize_package); $return_last_packages[] = $package; } return $return_last_packages; } private function _lock() { if (!sem_acquire($this->_sem_id)) { throw new Exception(self::ERROR_CAN_NOT_ACTIVATE_THE_SEMAPHORE_MESSAGE); } } private function _unlock() { sem_release($this->_sem_id); } public function resetMemory() { $this->_lock(); shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS, array()); shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND, array()); $this->_setLastQueueMessageId(0); $this->_lock(); } public function getLastQueueMessageId() { $this->_lock(); $last_queue_message_id_mem = $this->_getLastQueueMessageId(); $this->_unlock(); return $last_queue_message_id_mem; } } ?>
<?php /** * Пакет в очереди * */ class Sibirix_Queue_Smemory_Package { const ERROR_NOT_SUPPORTED_TYPE_ACTION_MESSAGE = 'ERROR_NOT_SUPPORTED_TYPE_ACTION'; const ERROR_FOR_NORMALIZE_NEED_ID_MESSAGE = 'FOR_NORMALIZE_NEED_ID'; const CHAT_ACTION = 'pasteChatMessages'; private $_supported_actions = array( 'updateRating', 'setChannel', 'pasteChatMessages', 'updateTimer', 'connectToServer', 'statistics' ); private $_read_private_property = array( 'id','action','time','data','recipient' ); private $_action; private $_data; private $_recipient; private $_time; private $_id; public function __construct() { $fg_args = func_get_args(); if (count($fg_args) == 1) { list( $this->_id, $this->_action, $this->_time, $this->_data, $this->_recipient ) = unserialize($fg_args[0]); } else { list($action, $data, $recipient) = $fg_args; $this->_validation($action, $data, $recipient); $this->_action = $action; $this->_data = $data; $this->_time = $this->_getmicrotime(); $this->_recipient = $recipient; } } public function normalize( $id = false ) { if ($id) { $this->_id = $id; } else { if ($this->_id === null) { throw new Exception(self::ERROR_FOR_NORMALIZE_NEED_ID_MESSAGE); } } return serialize(array( 0 => $this->_id, 1 => $this->_action, 2 => $this->_time, 3 => $this->_data, 4 => $this->_recipient )); } private function _validation($action, $data, $recipient) { if (!in_array($action, $this->_supported_actions)) { throw new Exception(self::ERROR_NOT_SUPPORTED_TYPE_ACTION_MESSAGE); } return true; } private function _getmicrotime() { list($usec, $sec) = explode(' ', microtime()); return number_format(((float)$usec + (float)$sec), 2, '.', ''); } public function __get($key) { if (in_array($key, $this->_read_private_property)) { return $this->{'_'.$key}; } return null; } public function isPost() { if ( $this->_action == self::CHAT_ACTION ) { return true; } return false; } } ?>
Выводы
- Для очередей необходимо использовать сторонние менеджеры. В нашем случае хорошо себя проявил Active MQ.
- Работа с кометами и отправляемыми на сервер запросами должна осуществляться легковесными скриптами, без полной инициализации фреймворка, коннектов к базе и других излишеств. Спасибо, кэп.
Сейчас, смотря на то какой был проделан путь, становится просто страшно. Надеюсь, эта статья поможет хабравчанам сделать правильные выводы и не повторить наших ошибок в будущем.
Спасибо за внимание, буду рад услышать вопросы и комментарии и с удовольствием на них отвечу.
UPD. По просьбам читателей — публикую ссылку на проект http://feelyoustar.com/
