Comments 35
Штука интересная, как-то все руки не дойдут пощупать.
А можно посмотреть что за проекты используют его в продакшне?
А можно посмотреть что за проекты используют его в продакшне?
Замечательно!
Раньше, помню, был чат… А сейчас, можно где-то посмотреть демку с участием phpDaemon?
Раньше, помню, был чат… А сейчас, можно где-то посмотреть демку с участием phpDaemon?
В данный момент нет, но работы ведутся. Появится тот самый чат и онлайн-игра.
Спасибо! Демки — это очень важная состовляющая проекта :)
Данко, не забудь про IRC бота Bounzaa :D
пару месяцев назад активно рылся по поводу примеров работы с buffer event-ами в pecl-libevent и обнаружил что там отсутствовала часть констант, довольно важных для работы с буффер эвентами собственно
пока искал кстати примеры кода обнаружил что гугль ничего кроме phpdaemon работающего с event_buffer_new не находит
поэтому небольшой совет — onFailureEvent задающийся в event_buffer_new срабатывает не только в случае ошибки но и например позволяет находить EOF сокета
вот эти константы появились в svn pecl-libevent-a
svn.php.net/viewvc/pecl/libevent/trunk/libevent.c?r1=290145&r2=296468
надеюсь поможет ;)
пока искал кстати примеры кода обнаружил что гугль ничего кроме phpdaemon работающего с event_buffer_new не находит
поэтому небольшой совет — onFailureEvent задающийся в event_buffer_new срабатывает не только в случае ошибки но и например позволяет находить EOF сокета
вот эти константы появились в svn pecl-libevent-a
svn.php.net/viewvc/pecl/libevent/trunk/libevent.c?r1=290145&r2=296468
надеюсь поможет ;)
В последней версии pecl-libevent исправлена (Тони поправил) работа с Signals и теперь в демонах можно ловить сигналы минуя pcntl_* функции. Что позволяет иметь 1 loop а не делать задержки в наносекундах на проверки пришедших сигналов, как, на сколько я видел, реализовано в текущей версии, что при больших нагрузках совсем не комильфо.
как пример:
как пример:
Оттестировано на боевых серверах и собственных демонах.
[цитата]
Просто для информации: я выпустил версию PECL/libevent 0.0.4.
Теперь банановый, с поддержкой сигналов в event_set(), event_buffer_set_cb() и без сегфолта в event_del().
Проапгрейдить можно с помощью `pecl install libevent` или взять с сайта: pecl.php.net/libevent
[/цитата]
как пример:
/** * запуск основного цикла обработки событий * @return void */ public function loop() { // пускаем цикл обработки событий $bRc = event_base_loop ( $this->rBaseEvent ); switch($bRc) { case -1: throw new CoreException ( 'Can\'t create base loop', '110060000' ); break; case 1: self::say ( "No events in base", 'LOOP' ); //throw new CoreException ( 'No events registered in base', '110060000' ); break; } self::say ( "Exit events...", 'LOOP' ); self::say ("Bye",'ASYNC'); }
как пример:
abstract class aAsynchronousServer { ... public function startSignal() { $aSignals = array(); $aSignals[SIGTERM] = 'stop'; $aSignals[SIGHUP] = 'restart'; foreach($aSignals as $iSignal => $sMethod ) { // новое событие для сокета $this->eventSignal[$iSignal] = event_new (); //self::say ( "Create new eventSignal [".$iSignal."]." ,'ASYNC'); // ловим сигналы $bRc = event_set ( $this->eventSignal[$iSignal], $iSignal, EV_SIGNAL, array ( $this, $sMethod ) ); $bRc = event_base_set ( $this->eventSignal[$iSignal], $this->rBaseEvent ); if ($bRc === false) { throw new CoreException ( "Не установить событие сигналов в базу", '110060006' ); } else { //self::say ( "Set signal [".$iSignal."] to event base." ,'ASYNC'); } $bRc = event_add ( $this->eventSignal[$iSignal] ); if ($bRc === false) { throw new CoreException ( "Не добавить событие сигнала", '110060007' ); } else { //self::say ( "Signal [".$iSignal."] add." ,'ASYNC'); } } } ... public function restart() { self::say ( 'Restart'); $this->stop(); $this->start(); $this->loop(); }
Оттестировано на боевых серверах и собственных демонах.
[цитата]
Просто для информации: я выпустил версию PECL/libevent 0.0.4.
Теперь банановый, с поддержкой сигналов в event_set(), event_buffer_set_cb() и без сегфолта в event_del().
Проапгрейдить можно с помощью `pecl install libevent` или взять с сайта: pecl.php.net/libevent
[/цитата]
да. вот это совсем не гуд.
while (TRUE) { pcntl_signal_dispatch(); $this->sigwait(1,0); ...
Во-первых не надо гадить в комментах, юзайте pastebin.
Во-вторых этот кусок кода вообще-то из мастер-процесса. Какие на него могут быть нагрузки? Он только спавнит воркеров. Там так специально сделано, интервал в секунду между проверками.
Во-вторых этот кусок кода вообще-то из мастер-процесса. Какие на него могут быть нагрузки? Он только спавнит воркеров. Там так специально сделано, интервал в секунду между проверками.
был не прав. но сути не меняет
Ничего себе не меняет :-) Сначала вкурите как всё устроено там, а потом советуйте:-)
я курил ваш демон. долго. трава была харошая. когда еще был фарш и не было каментов совсем. много было интересного. спасибо.
а про суть — так это я про сигналы и не более того.
а этот кусок?
все равно остановка? или я опять не прав?
pcntl_signal_dispatch(); event_add($this->timeoutEvent, $this->microsleep); event_base_loop($this->eventBase,EVLOOP_ONCE);
все равно остановка? или я опять не прав?
Не прав. pcntl_signal_dispatch() вызывает колбеки для тех сигналов которые в очереди. pcntl_signal_dispatch — Calls signal handlers for pending signals.
А уже timed event это для того чтоб установить максимальный таймаут между вызовами.
А уже timed event это для того чтоб установить максимальный таймаут между вызовами.
старик. я тут без критики. я все про то, что теперь можно делать без таймаутов, а чисто по событию.
=)
=)
Чего по событию? event_add($this->timeoutEvent, $this->microsleep) это чтоб не гонять очередь слишком часто и не нагружать проц слишком сильно.
К сигналам это не имеет никакого отношения, они и так «по событию.»
К сигналам это не имеет никакого отношения, они и так «по событию.»
меня тоже в свое время убило отсутствие событий на сигналы и необходимость опрашивать в циклах _dispatch() для отлова онных. я решил это в свое время таймерами имея единый base для событий и мгновенную реакцию на события.
Потом Тони пофиксил это. Мы потестили. Я переписал свои куски на обработку сигналов средствами либевента. Все работает.
p.s. ну признай, что останавливаешь цикл? =) на микросек, но стопорится?
Потом Тони пофиксил это. Мы потестили. Я переписал свои куски на обработку сигналов средствами либевента. Все работает.
p.s. ну признай, что останавливаешь цикл? =) на микросек, но стопорится?
Вашего кода я не видел, я лишь могу утверждать что в моем коде не происходит sleep() в обработке сигналов. Было бы глупо признать ошибочное утверждение.
я почитаю новые исходники. отпишусь.
=)
спасибо за ответы.
=)
спасибо за ответы.
я понимаю как обрабатывыается pcntl_dispatch(). не вопрос.
вопрос немного в другом. как часто вызыввется сие чудо? если оно не работает по событиям, то должен быть цикл с засыпанием что б не постоянно его дергать.
вот про эти засыпания я и говорю. если у вас есть такие циклы ( =) ), то вставив туда диспетчер можно ловить сигналы.
я лишь про это… и про microsleep
вопрос немного в другом. как часто вызыввется сие чудо? если оно не работает по событиям, то должен быть цикл с засыпанием что б не постоянно его дергать.
вот про эти засыпания я и говорю. если у вас есть такие циклы ( =) ), то вставив туда диспетчер можно ловить сигналы.
я лишь про это… и про microsleep
Кто хочет примеров работы. Этот пример был показан на devconf2010. тут асинхронная обработка данных + таймеры, коих совсем нет в документации, но они работают на ура.
<?php /** * Пример работы с сокетами с библиотекой libevent * @author 440hz@php.ru */ class MySocket { /** * Ошибка чтения буфера * @var int */ const EVBUFFER_READ = 0x01; /** * Ошибка буфера записи * @var int */ const EVBUFFER_WRITE = 0x02; /** * Ошибка буфера конца файла * @var int */ const EVBUFFER_EOF = 0x10; /** * Ошибка буфена * @var int */ const EVBUFFER_ERROR = 0x20; /** * Ошибка буфера таймаут * @var int */ const EVBUFFER_TIMEOUT = 0x40; /** * Пул в состоянии чтения данных * @var int */ const POOL_READ = 0x01; /** * Пул в состоянии записи данных * @var int */ const POOL_WRITE_CLOSE = 0x02; /** * Пул в состоянии записи данных и удержания коннекта * @var int */ const POOL_WRITE_KEEP_ALIVE = 0x04; /** * Expect continie * @var int */ const POOL_WRITE_EXPECT = 0x08; /** * Время в секундах ожидания ответа от клиента до закрытия коннекта * @var int */ var $iTimeOutRead = 30; /** * Время в секундах ожидания ответа клиенту до закрытия коннекта * @var int */ var $iTimeOutWrite = 30; /** * Общий счетчик кол-ва ошибок чтения * @var int */ var $iErrorRead = 0; /** * Счетчик кол-ва ошибок чтения конца файла * @var int */ var $iErrorReadEOF = 0; /** * Счетчик кол-ва ошибок чтения * @var int */ var $iErrorReadError = 0; /** * Счетчик кол-ва ошибок таймаута * @var int */ var $iErrorReadTimeOut = 0; /** * общий счетчик кол-ва ошибок записи * @var int */ var $iErrorWrite = 0; /** * Счетчик кол-ва ошибок записи конца файла * @var int */ var $iErrorWriteEOF = 0; /** * Счетчик кол-ва ошибок записи * @var int */ var $iErrorWriteError = 0; /** * Счетчик кол-ва ошибок записи таймаута * @var int */ var $iErrorWriteTimeOut = 0; /** * Длина буфера чтения * @var int */ var $iBufferReadLenght = 4; /** * Счетчик коннектов * @var int */ private $iConnection = 0; /** * массив коннектов * @var array */ private $aConnections = array (); var $rBaseEvent = null; /** * ЛОГ * @param string $msg */ private function log($msg="\n`") { print("{$msg}\n"); } public function Start ( $sAddr = 'tcp://127.0.0.1:666' ) { /** * Открываем слушающий сокет * @var $rSocket resource */ $this->rSocket = @stream_socket_server ( $sAddr, $errno, $errstr, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN ); if ( $this->rSocket === false ) { die ( "Не могу открыть сокет по адресу [$sAddr].\n{$errno}:{$errstr}" ); } else { $this->log("Открыт сокет по адресу: {$sAddr}"); } /** * сдалем его не блокирующим, что б позволить еще принимать коннекты * @var $bRC bool */ $bRC = stream_set_blocking ( $this->rSocket, 0 ); if ( $bRC === false ) { die ( "Не сделать сокет неблокирующим" ); } else { $this->log('Сокет разблокирован'); } /* * Создаем базу событий */ $this->rBaseEvent = event_base_new ( ); if ( $this->rBaseEvent === false ) { die ( 'Не создать базу событий' ); } else { $this->log('Создана база событий'); } /** * новое событие для сокета * @var $event resource */ $this->rSocketEvent = event_new ( ); /** * ловим на чтение и после операции чтения возвращаем событие в базу * EV_READ - чтение * EV_PERSIST - вернуть событие в базу после выполнения * * @var $bRC bool */ $bRc = event_set ( $this->rSocketEvent, $this->rSocket, EV_READ | EV_PERSIST, array ( $this, 'onAcceptEvent' ) ); if ( $bRC === false ) { die ( "Не установить событие сокета" ); } else { $this->log('Создано событие сокета'); } /* * Кладем в базу событий */ $bRc = event_base_set ( $this->rSocketEvent, $this->rBaseEvent ); if ($bRc === false) { die ( "Не установить событие сокета в базу"); } else { $this->log('Событие сокета установлено в базу событий'); } /* * пускаем... */ $bRc = event_add ( $this->rSocketEvent ); if ($bRc === false) { die ( "Не добавить событие сокета в базу"); } else { $this->log('Событие сокета добавлено в базу событий'); } } /** * Прием коннекта на сокете * @param resource $rSocket * @param resource $rEvent * @param array $args */ function onAcceptEvent ( $rSocket, $rEvent, $args ) { /** * Номер нового коннекта * @var $iConnect int */ $iConnect = $this->iConnection ++; /** * Примем коннект * @var resource */ $rConnection = @stream_socket_accept ( $this->rSocket ); if ( $rConnection === false ) { die ( "Ошибка соединения сокета" ); } else { $this->log('Соединились ['.$iConnect.']'); } /** * сдалем его не блокирующим, что б позволить еще принимать коннекты * @var bool */ $bRc = stream_set_blocking ( $rConnection, 0 ); if ( $bRc === false ) { die ( "Не сделать коннект [{$iConnect}] на сокете не блокирующим" ); } else { $this->log('Соединение ['.$iConnect.'] разблокировано'); } /** * запомним коннект * @var array */ $this->aConnections [ $iConnect ] = $rConnection; /* * сформируем пулы обмена */ $this->aState [ $iConnect ] = self::POOL_READ; $this->aReadPool [ $iConnect ] = ''; $this->aWritePool [ $iConnect ] = ''; /* * создадим буфер обмена данными */ $buf = event_buffer_new ( $this->aConnections [ $iConnect ], array ( $this, 'onReadEvent' ), array ( $this, 'onWriteEvent' ), array ( $this, 'onFailureEvent' ), array ( $iConnect ) ); $this->log('Буфер для соединения ['.$iConnect.'] создан'); /* буфер в базовую наблюдалку */ event_buffer_base_set ( $buf, $this->rBaseEvent ); $this->log('Буфер для соединения ['.$iConnect.'] помещен в базу событий'); /* таймауты что б рубить задержки */ self::setTimeoutBuffer ( $buf, $this->iTimeOutRead, $this->iTimeOutWrite ); $this->log('Буферу для соединения ['.$iConnect.'] назначены таймеры ожидания'); /* начальные и концевые терминаторы */ event_buffer_watermark_set ( $buf, EV_READ | EV_WRITE , 0 , 0xffffff ); $this->log('Буферу для соединения ['.$iConnect.'] назначены концевые терминаторы'); /* приоритет буфера */ event_buffer_priority_set( $buf, 10 ); $this->log('Буферу для соединения ['.$iConnect.'] установлен приоритет'); /* включаем буфер на события и возвращаем события назад после выполнения */ event_buffer_enable ( $buf, EV_READ | EV_WRITE | EV_PERSIST ); $this->log('Буфер для соединения ['.$iConnect.'] включен на обработку чтения и записи'); /* сохраним буффер */ $this->aBuffers [$iConnect] = $buf; } /** * Обработка ошибок буферов ввода-вывода * Например по таймауту отвалить надо или сам отвалился кто * * @param $rStream Поток ввода-вывода * @param $iError Ошибка * @param $args Дополнительные аргументы * @return void */ function onFailureEvent($rStream, $iError, $args) { $iConnect = $args [0]; if ($iError & self::EVBUFFER_READ) { $this->iErrorRead++; if ($iError & self::EVBUFFER_EOF) { $this->iErrorReadEOF++; } if ($iError & self::EVBUFFER_ERROR) { $this->iErrorReadError++; } if ($iError & self::EVBUFFER_TIMEOUT) { $this->iErrorReadTimeOut++; } } if ($iError & self::EVBUFFER_WRITE) { $this->iErrorWrite++; if ($iError & self::EVBUFFER_EOF) { $this->iErrorWriteEOF++; } if ($iError & self::EVBUFFER_ERROR) { $this->iErrorWriteError++; } if ($iError & self::EVBUFFER_TIMEOUT) { $this->iErrorWriteTimeOut++; } } /* закроем коннект */ $this->CloseConnection ( $iConnect ); } /** * Закрыть коннект * * @param $c номер коннекта * @return void */ function CloseConnection($iConnect) { $this->log('Соединение ['.$iConnect.'] закрыто'); /* отрубим буфер */ event_buffer_disable ( $this->aBuffers [$iConnect], EV_READ | EV_WRITE ); /* освободим. */ event_buffer_free ( $this->aBuffers [$iConnect] ); unset ( $this->aBuffers [$iConnect] ); /* закроем коннект */ fclose ( $this->aConnections [$iConnect] ); /* освободим */ unset ( $this->aConnections [$iConnect] ); unset ( $this->aState [$iConnect] ); unset ( $this->aReadPool [$iConnect] ); } /** * Обнулить коннект и ждать опять запрос * * @param $c номер коннекта * @return void */ function ZeroConnection($iConnect) { $this->aState [$iConnect] = self::POOL_READ; $this->aReadPool [$iConnect] = ''; } /** * Выдать данные и продолжить чтение. Для HTTP 101 кода * @param $iConnect */ function ExpectConnection($iConnect) { $this->aState [$iConnect] = self::POOL_READ; } /** * Событие чтения данных * * @param $rStream * @param $args * @return void */ function onReadEvent($rStream, $args) { $iConnect = $args [0]; if ($this->aState [$iConnect] !== self::POOL_READ) { die ( "Состояние буфера[{$iConnect}] не ЧТЕНИЕ!"); $this->CloseConnection ( $iConnect ); return; } do { do { $tmp = self::readBuffer ( $this->aBuffers [$iConnect] ); /* не все клиент передал. обычно по telnet так. кусками плюет. */ if ($tmp === false) { break; } if (0 == strlen($tmp)) { return; } $this->log('На соединении ['.$iConnect.'] прочитано ['.strlen($tmp).'] байт'); $this->aReadPool [$iConnect] .= $tmp; if( $this->iBufferReadLenght > strlen($tmp) ) { break; } } while ( true ); } while (!$this->CheckRequest ( $iConnect ) ); } /** * Событие записи в буффер * * @param $rStream * @param $args * @return void */ function onWriteEvent($rStream, $args) { $iConnect = $args [0]; if ($this->aState [$iConnect] === self::POOL_WRITE_KEEP_ALIVE) { $this->log('На соединении ['.$iConnect.'] была запись с сохранением соединение'); $this->ZeroConnection ( $iConnect ); return; } if ($this->aState [$iConnect] === self::POOL_WRITE_CLOSE) { $this->log('На соединении ['.$iConnect.'] была запись с закрытием соединение'); $this->CloseConnection ( $iConnect ); return; } if ($this->aState [$iConnect] === self::POOL_WRITE_EXPECT) { $this->log('На соединении ['.$iConnect.'] была запись с сохранением соединение'); $this->ExpectConnection ( $iConnect ); return; } } private function CheckRequest($iConnect) { $buf = $this->aReadPool [$iConnect]; $this->log('На соединении ['.$iConnect.'] анализ полученных данных'); if( 0 < ($pos = strpos($buf,"\r\n")) ) { $buf = substr($buf,0,$pos); $this->aState [$iConnect] = self::POOL_WRITE_CLOSE; self::writeBuffer ( $this->aBuffers [$iConnect], 'Hello, '.$buf."\n" ); return true; } else { return false; } } /** * Обвязка над функцией event_buffer_write * * @param $hBuffer * @param $sData * @param $iDataSize * @return boolean */ protected static function writeBuffer ($hBuffer, $sData, $iDataSize=-1) { if ($iDataSize > 0) { $return = event_buffer_write($hBuffer, $sData, $iDataSize); } else { $return = event_buffer_write($hBuffer, $sData); } return $return; } /** * Обвязка над функцией event_buffer_read * * @param $hBuffer Буфер * @return string Вернет string или false */ protected function readBuffer($hBuffer) { return event_buffer_read ( $hBuffer, $this->iBufferReadLenght ); } /** * Обвязка над функуцией event_buffer_timeout_set * * @param $hBuffer Буфер * @param $iReadTimeout Таймаут в секундах на чтение * @param $iWriteTimeout таймаут в секундах на запись * @return boolean */ protected static function setTimeoutBuffer($hBuffer, $iReadTimeout, $iWriteTimeout) { return event_buffer_timeout_set($hBuffer, $iReadTimeout, $iWriteTimeout); } /** * запуск основного цикла обработки событий * @return void */ function loop() { $bRc = event_base_loop ( $this->rBaseEvent ); switch($bRc) { case -1: die ( 'Не могу создать очередь'); break; case 1: die ( "нет событий в базе"); break; } } /* ТАЙМЕРЫ */ function addTimer($iTimer,$sTimerName,$aCallBack,$iInterval) { $this->log ( "Создание таймера [{$sTimerName}]"); if( !is_callable($aCallBack,true,$sCallBack) OR !method_exists($aCallBack[0],$aCallBack[1])) { die ( "Не определен CallBack [{$sCallBack}] для таймера [{$sTimerName}]"); } else { $this->log ( "Установлен таймер [{$iTimer}/{$sTimerName}] [{$sCallBack}] с интервалом [".$iInterval."] сек." ); } $this->aTimers[$iTimer] = array(); $this->aTimers[$iTimer]['name'] = $sTimerName; $this->aTimers[$iTimer]['interval'] = $iInterval; $this->aTimers[$iTimer]['callback'] = $aCallBack; $hTmpFile = tmpfile(); if($hTmpFile === false) { die ( "Не возможно создать временный файл для таймера"); } else { $this->log('Создан временный файл для таймера'); } $this->aTimers[$iTimer]['tmpfile'] = $hTmpFile; $event = event_new(); $this->aTimers[$iTimer]['timer'] = $event; $bRc = event_set( $this->aTimers[$iTimer]['timer'], $this->aTimers[$iTimer]['tmpfile'] , 0, array( $this , 'onTimer'), array ( $iTimer )); if ($bRc === false) { die ( 'Не установить событие таймера ['.$iTimer.']' ); } else { $this->log('Событие таймера ['.$iTimer.'] установлено'); } $bRc = event_base_set( $this->aTimers[$iTimer]['timer'], $this->rBaseEvent ); if ($bRc === false) { die ( "Не установить таймер в базу"); } else { $this->log('Событие таймера ['.$iTimer.'] установлено в базу'); } $this->setTimer($iTimer); } function setTimer($iTimer) { if(!isset($this->aTimers[$iTimer])) return; $bRc = event_add( $this->aTimers[$iTimer]['timer'], $this->aTimers[$iTimer]['interval'] * 1000000 ); if ($bRc === false) { die ( "Не добавить таймер [{$iTimer}] в базу"); } else { $this->log('Таймер добавлен ['.$iTimer.'] в базу'); } } function delTimer($iTimer,$aTimer){ $this->log ( "Таймер [".$iTimer."] [".$aTimer['name']."] удален"); event_free($aTimer['timer']); } /** * Обработчик таймеров * @param $rSocket * @param $rEvent * @param $a */ function onTimer( $rSocket, $rEvent, $a ) { $iTimer = $a[0]; call_user_func( $this->aTimers[$iTimer]['callback'], $this, $iTimer ); $this->setTimer($iTimer); } static $iCount; public static function onMyTimer($handler,$iTimer) { print("Демон работает: ".(time() - MySocket::$iCount)." сек\n"); } } $MySocket = new MySocket(); $MySocket->Start(); MySocket::$iCount = time(); $MySocket->addTimer(0,'MYTIMER',array($MySocket,'onMyTimer'),10); $MySocket->loop();
Это вы хотели проверить максимальный размер коммента на хабре?
Sign up to leave a comment.
phpDaemon: хорошие новости