Одним прекрасным утром к нам в офис забежал молодой парень, с амбициозной идеей и “средствами для реализации” в кармане. “Заходишь на сайт, а там — телевизор. К нему можно подключиться через свою web-камеру. Одновременно может вещать только один человек, остальные — ждут своей очереди (но можно посмотреть скриншоты с их вебкамер). Задача каждого — удержаться в эфире, как можно дольше. Если выступающий нравится публике — все жмут “Cool!”, если подкачал — “Go away!”. И человек заменяется на следующего в очереди. Ну и можно в чат писать”.
Хорошая идея — драйвовый проект. Рисуем прототип, решаем реализовать обновление чата, списка пользователей, рейтинга и т.д. с помощью push-технологии. Это когда после загрузки страницы соединение между клиентом и сервером не закрывается, а продолжает использоваться для отправки сервером каких-либо событий на клиента.
Осторожно! Эта шняга может убить ваш сервер! Кстати, если вы вдруг решите написать высконагруженный скандинавский аукцион – истина и веселые картинки где-то рядом, под катом.
Пара слов про PUSH
Использование технологии PUSH оправдано там, где нужно очень быстро обновлять множество клиентов при определенных изменениях на стороне сервера. В общем случае общение клиента и сервера выглядит так
Очевидно, что на каждого клиента нам нужен один постоянно работающий серверный push-процесс, который называется Comet (Комета). Это накладно с точки зрения использования памяти сервера, но альтернатива этому: постоянные запросы к серверу — на порядок хуже, медленнее и тяжелее. Правда есть еще один хороший вариант — работать напрямую с сокетами, но это отдельная тема.
Процесс работы
Когда клиент совершает какое-то действие (например, пишет в чат или голосует), он отправляет на сервер ajax-запрос с соответствующей командой. Результат выполнения этой команды нужно разослать всем подключенным клиентам. Для этого, скрипт, занимающийся обработкой ajax-запроса, должен оповестить все Кометы, которые уже передадут команду на обновление своим клиентам, а те в свою очередь — аккуратно отрисуют изменение.
Не стоит говорить, что одновременно может поступать довольно много запросов на обновление. Для этого должна быть организована очередь таких событий, из которой Кометы смогут брать события и рассылать их дальше.
Примерно так выглядит происходящее, когда мы чего-то написали в чат.
Ключевых моментов в этой схеме два:
- Реализация кометы (хороший обзор предоставил пользователь balepc)
- Реализация очереди. Давайте остановимся на них более подробно.
Реализация очереди
Для нашего проекта было необходимо, чтобы:
- Кометы мгновенно «просыпались» при поступлении новых событий в очередь, а не опрашивали ее постоянно. Это позволяет значительно снизить нагрузку на сервер, а так же реализовать мгновенную реакцию системы на действия пользователя.
- Если Комета по какой-то причине умирает (например, пользователь обновил страницу), пользователю все равно должны дойти все события, которые произошли между моментом отмирания Кометы и моментом восстановления соединения. Паузы в отправке сообщений так же возникают, если Push-канал работает через технологии “long polling” (характерно, например, для браузера Opera). В этом случае push-соединение создается заново каждый раз, когда клиент получает сообщение от сервера.
- Когда у нас на сайте сидит 5 человек, то голос каждого из них должен оказывать большее влияние, чем когда у нас на сайте сидит 100 человек.
- Пользователи, отключившие камеры, должны уходить из блока скриншотов.
В этом случае нам нужно достоверно знать, сколько и каких пользователей сейчас on-line. Дело в том, что после закрытия пользователем окна браузера на сервере не происходит никаких событий, поэтому отследить этот момент невозможно. И мы пришли к выводу о необходимости того, чтобы комета просыпалась и отчитывалась с определенной периодичностью (скажем, раз в пять секунд) о том, что она жива.
- Каждое посылаемое в очередь событие доходило до всех пользователей, независимо от качества их канала.
В простых случаях взаимодействия между двумя процессами можно воспользоваться функциями работы с очередями, встроенными в PHP, либо обертку Zend_Queue. Однако они не обеспечивают рассылку события сразу всем слушающим процессам (кометам).
Экспериментируем с очередями
Один из возможных вариантов решения — менеджер очередей.
Способ реализации:
- Каждая Комета поднимает свою очередь.
- Идентификатор новой очереди заносится в общую для всех процессов память (shared memory).
- Для предотвращения одновременной работы нескольких процессов с очередями используется семафор.
- При отправке сообщения в очередь событие рассылается в очередь каждого процесса.
Примерно так:
// когда мы нарисовали эту картинку на клочке бумажки — нам стало уже страшно, но отважный велосипедостроитель может попытаться довести реализацию до конца.
Даже если мы сделаем такой менеджер, то кометы будут просыпаться только при поступлении новых событий, а нам этого явно недостаточно (помните про необходимость знать, кто сейчас живой?).
К сожалению, стандартными функциями php сделать очередь с таймаутами невозможно. В голову пришла мысль реализовать какой-то сторонний серверный процесс, который бы периодически «пробуждал» кометы, подбрасывая события в очередь. Кстати, этот же процесс может так же удалять умершие идентификаторы очередей из shared memory. Вот что получаем в итоге:
При взгляде на эту картинку невольно вспоминается высказывание Луговского
«Человеческая фантазия безгранична на идиотские и нелепые архитектуры...»
Начинаем анализировать существующие решения для организации очередей с точки зрения пригодности использования в нашем приложении.
Наш выбор пал на Apache ActiveMQ (реально выбирали между ним и RabbitMQ)
Плюсы:
- Очень быстрый
- Очень стабильный
- Поддержка нескольких протоколов, в том числе — stomp
- Есть реализация взаимодействия в ZendFramework
- Есть способ рассылки сообщений всем слушателям (так называемые Topics)
- Умеет пробуждать ожидающий процесс по таймауту
- Хорошо масштабируется
Минус: Не умеет возвращать количество сообщений в очереди
Более подробный обзор менеджеров очередей от хабраюзера aleks_raiden можно найти здесь
http://habrahabr.ru/blogs/hi/44907/
http://habrahabr.ru/blogs/hi/45891/
Единственное, что нельзя было реализовать напрямую, используя механизм Topics — это восстановление событий, которые могли произойти при обрыве соединения кометы, до того, как она поднялась (это можно было сделать, отказавшись от механизма Topics, и используя индивидуальные очереди на каждый процесс, но...)
Мы решили складывать свежие N штук событий в shared memory, чтобы комета смогла мгновенно восстановить свой контекст. В итоге архитектура получилась следующая:
Бренчмарки
Для реализации приложения мы использовали ZendFramework. Ajax-запросы поступали в соответствующие контроллеры, которые обрабатывали их, и складывали сообщения в очередь, откуда кометы забирали сообщения и рассылали их на клиентов. Сами кометы так же реализованы как Action внутри контроллера ZendFramework.
Запустив Apache Branchmark, мы с ужасом обнаружили, что система активно кушает процессор уже при 25 одновременных обращениях.
300 запросов на 25 одновременных потоков
Как выяснилось, 99% времени занимала инициализация ZendFramework и соединение с базой данных. О-опс. Ну, это старая новость, и лекарство от этого известно — нужно исключить подключение классов ZendFramework, кроме реально необходимых. А все необходимые классы объединить в один файл.
Мы пришли к выводу, что придется отказаться от использования каркаса MVC при обработке ajax-запросов на голосование и чат, а так же в реализации кометы (поскольку скорость ее старта очень критична для браузеров, поддерживающих только механизм Long poll). Так же мы избавились от каких-либо запросов к базе данных в процессах обработки ajax-запросов. Для чего мы вынесли все необходимые данные в shared memory. При необходимости же синхронизации shared memory и базы данных — выполняли синхронизацию сторонним процессом.
Итог
300 запросов на 100 одновременных потоков
Реакция на события от клиента стала практически мгновенной. Ура, победа! Легковесные кометы используют память и процессор очень экономно. Узкое горлышко приложения — раздача видео-потока (но если вы будете делать скандинавский аукцион, где эффективно применять схожую архитектуру очередей — это не ваша проблема ;-), возможно напишем про это в следующих статьях, если выдержим хабраэффект.
Ниже опубликован наш менеджер очередей.
<?php
require_once 'Zend/Queue.php';
require_once 'Sibirix/Queue/Smemory/Package.php';
/**
* Менеджер очередей
*
*/
class Sibirix_Queue_Smemory_Manager {
const ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE = "BROKEN_OPTIONS_FOR_INIT";
const ERROR_UNKNOWN_IDENTIFIER_STORAGE_MESSAGE = "UNKNOWN_IDENTIFIER_STORAGE";
const ERROR_CAN_NOT_ACTIVATE_THE_SEMAPHORE_MESSAGE = "CAN_NOT_ACTIVATE_THE_SEMAPHORE";
const SHM_VAR_LAST_CHAT_POSTS = 1;
const SHM_VAR_LAST_SYSTEM_COMMAND = 2;
const SHM_VAR_QUEUE_LAST_ID = 3;
private $_count_chat_package_storage;
private $_count_system_package_storage;
private $_activeMQoptions;
private $_queue;
/**
* semaphore key
*/
private $_sem_key;
/**
* Shared memory APP_KEY = one symbol
*/
private $_app_key;
private $_shared_memory_id = false;
private $_shared_memory_handler = false;
private $_shared_memory_size;
private $_sem_id = false;
private $_last_queue_message_id = false;
public function __construct($options = array()) {
if (!is_array($options)) {
throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
}
if (!isset($options['sem_key'])) {
throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
}
if (!isset($options['app_key'])) {
throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
} else {
$options['app_key'] = (string)$options['app_key'];
if (strlen($options['app_key']) != 1) {
throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
}
}
if (!isset($options['activeMQ'])) {
throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
}
if (!isset($options['shared_memory_size'])) {
throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
}
if (!isset($options['count_chat_package_storage'])) {
throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
}
if (!isset($options['count_system_package_storage'])) {
throw new Exception(self::ERROR_BROKEN_OPTIONS_FOR_INIT_MESSAGE);
}
$this->_sem_key = $options['sem_key'];
$this->_app_key = $options['app_key'];
$this->_activeMQoptions = $options['activeMQ'];
$this->_shared_memory_size = $options['shared_memory_size'];
$this->_count_chat_package_storage = $options['count_chat_package_storage'];
$this->_count_system_package_storage = $options['count_system_package_storage'];
$this->_sem_id = sem_get($this->_sem_key, 1);
$this->_shared_memory_id = ftok(".", $this->_app_key);
$this->_shared_memory_handler = shm_attach($this->_shared_memory_id, $this->_shared_memory_size);
}
public function connectTopic() {
if (!$this->_queue) {
$this->_queue = new Zend_Queue('Activemq', $this->_activeMQoptions);
}
return $this->_queue;
}
public function receive() {
$this->connectTopic();
$message = false;
$msg_receive = $this->_queue->receive(1);
foreach ( $msg_receive as $msg ) {
$pkg = new Sibirix_Queue_Smemory_Package($msg->body);
$this->_last_queue_message_id = $pkg->id;
$message = $pkg;
}
return $message;
}
public function receiveAll() {
$this->connectTopic();
$last_queue_message_id_mem = $this->_sem_getLastQueueMessageId();
$messages = array();
while(true) {
$msg_receive = $this->_queue->receive(1);
if (!$msg_receive->count()) {break;}
foreach ( $msg_receive as $message ) {
$pkg = new Sibirix_Queue_Smemory_Package($message->body);
$this->_last_queue_message_id = $pkg->id;
$messages[] = $pkg;
}
if ($this->_last_queue_message_id >= $last_queue_message_id_mem ) {
$last_queue_message_id_mem = $this->_sem_getLastQueueMessageId();
if ($this->_last_queue_message_id >= $last_queue_message_id_mem ) {
break;
}
}
}
return $messages;
}
public function send($action, $data, $recipient = false) {
$this->connectTopic();
$package = new Sibirix_Queue_Smemory_Package($action, $data, $recipient);
$this->_lock();
$last_queue_message_id_mem = $this->_getLastQueueMessageId();
$last_queue_message_id_mem++;
$this->_setLastQueueMessageId($last_queue_message_id_mem);
$this->_queue->send( $package->normalize( $last_queue_message_id_mem ) );
if ( $package->isPost() ) {
$this->_savePackageToChatStorage( $package );
} else {
$this->_savePackageToSystemStorage( $package );
}
$this->_unlock();
return $last_queue_message_id_mem;
}
private function _getLastQueueMessageId() {
$last_queue_message_id_mem = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID);
if (!$last_queue_message_id_mem) {
shm_put_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID, 0);
$last_queue_message_id_mem = 0;
}
return $last_queue_message_id_mem;
}
private function _setLastQueueMessageId($id) {
shm_put_var($this->_shared_memory_handler, self::SHM_VAR_QUEUE_LAST_ID, $id);
return true;
}
private function _sem_getLastQueueMessageId() {
$this->_lock();
$last_queue_message_id_mem = $this->_getLastQueueMessageId();
$this->_unlock();
return $last_queue_message_id_mem;
}
private function _savePackageToChatStorage( $package ) {
$last_chat_posts = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS);
if (!$last_chat_posts) {$last_chat_posts=array();}
if (count($last_chat_posts) >= $this->_count_chat_package_storage) {
while(count($last_chat_posts) >= $this->_count_chat_package_storage) {
array_shift($last_chat_posts);
}
}
array_push($last_chat_posts, $package->normalize());
shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS, $last_chat_posts);
}
private function _savePackageToSystemStorage($package) {
$last_sys_cmd = shm_get_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND);
if (!$last_sys_cmd) {$last_sys_cmd=array();}
if (count($last_sys_cmd) >= $this->_count_system_package_storage) {
while(count($last_sys_cmd) >= $this->_count_system_package_storage) {
array_shift($last_sys_cmd);
}
}
array_push($last_sys_cmd, $package->normalize());
shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND, $last_sys_cmd);
}
public function getLastPackage($storage) {
if ( $storage != self::SHM_VAR_LAST_SYSTEM_COMMAND && $storage != self::SHM_VAR_LAST_CHAT_POSTS ) {
throw new Exception(self::ERROR_UNKNOWN_IDENTIFIER_STORAGE_MESSAGE);
}
$this->_lock();
$last_packages = shm_get_var($this->_shared_memory_handler, $storage);
if (!$last_packages) {$last_packages=array();}
$this->_unlock();
$return_last_packages = array();
foreach($last_packages as $normalize_package) {
$package = new Sibirix_Queue_Smemory_Package($normalize_package);
$return_last_packages[] = $package;
}
return $return_last_packages;
}
private function _lock() {
if (!sem_acquire($this->_sem_id)) {
throw new Exception(self::ERROR_CAN_NOT_ACTIVATE_THE_SEMAPHORE_MESSAGE);
}
}
private function _unlock() {
sem_release($this->_sem_id);
}
public function resetMemory() {
$this->_lock();
shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_CHAT_POSTS, array());
shm_put_var($this->_shared_memory_handler, self::SHM_VAR_LAST_SYSTEM_COMMAND, array());
$this->_setLastQueueMessageId(0);
$this->_lock();
}
public function getLastQueueMessageId() {
$this->_lock();
$last_queue_message_id_mem = $this->_getLastQueueMessageId();
$this->_unlock();
return $last_queue_message_id_mem;
}
}
?>
<?php
/**
* Пакет в очереди
*
*/
class Sibirix_Queue_Smemory_Package {
const ERROR_NOT_SUPPORTED_TYPE_ACTION_MESSAGE = 'ERROR_NOT_SUPPORTED_TYPE_ACTION';
const ERROR_FOR_NORMALIZE_NEED_ID_MESSAGE = 'FOR_NORMALIZE_NEED_ID';
const CHAT_ACTION = 'pasteChatMessages';
private $_supported_actions = array(
'updateRating',
'setChannel',
'pasteChatMessages',
'updateTimer',
'connectToServer',
'statistics'
);
private $_read_private_property = array(
'id','action','time','data','recipient'
);
private $_action;
private $_data;
private $_recipient;
private $_time;
private $_id;
public function __construct() {
$fg_args = func_get_args();
if (count($fg_args) == 1) {
list(
$this->_id,
$this->_action,
$this->_time,
$this->_data,
$this->_recipient
) = unserialize($fg_args[0]);
} else {
list($action, $data, $recipient) = $fg_args;
$this->_validation($action, $data, $recipient);
$this->_action = $action;
$this->_data = $data;
$this->_time = $this->_getmicrotime();
$this->_recipient = $recipient;
}
}
public function normalize( $id = false ) {
if ($id) {
$this->_id = $id;
} else {
if ($this->_id === null) {
throw new Exception(self::ERROR_FOR_NORMALIZE_NEED_ID_MESSAGE);
}
}
return serialize(array(
0 => $this->_id,
1 => $this->_action,
2 => $this->_time,
3 => $this->_data,
4 => $this->_recipient
));
}
private function _validation($action, $data, $recipient) {
if (!in_array($action, $this->_supported_actions)) {
throw new Exception(self::ERROR_NOT_SUPPORTED_TYPE_ACTION_MESSAGE);
}
return true;
}
private function _getmicrotime() {
list($usec, $sec) = explode(' ', microtime());
return number_format(((float)$usec + (float)$sec), 2, '.', '');
}
public function __get($key) {
if (in_array($key, $this->_read_private_property)) {
return $this->{'_'.$key};
}
return null;
}
public function isPost() {
if ( $this->_action == self::CHAT_ACTION ) {
return true;
}
return false;
}
}
?>
Выводы
- Для очередей необходимо использовать сторонние менеджеры. В нашем случае хорошо себя проявил Active MQ.
- Работа с кометами и отправляемыми на сервер запросами должна осуществляться легковесными скриптами, без полной инициализации фреймворка, коннектов к базе и других излишеств. Спасибо, кэп.
Сейчас, смотря на то какой был проделан путь, становится просто страшно. Надеюсь, эта статья поможет хабравчанам сделать правильные выводы и не повторить наших ошибок в будущем.
Спасибо за внимание, буду рад услышать вопросы и комментарии и с удовольствием на них отвечу.
UPD. По просьбам читателей — публикую ссылку на проект http://feelyoustar.com/