Pull to refresh

Websocket в продакшене

Reading time 30 min
Views 41K
10 месяцев назад я начал делать браузерную игрушку. Выбор пал на cocos js в качестве графики и websocket в качестве общения с сервером. Технология очень понравилась и я на ней организовал всё общение игры с сервером. Использовал для этого эту статью. Но, к сожалению, тот код, который приведен в той статье, нельзя использовать в продакшене. Как выяснилось, уровень проблемы даже не критический, а блокирующий. Всё настолько плохо, что мне пришлось переписывать всё общение с сервером с вебсокетов на longpooling. В итоге я оставил вариант «если у нас браузер не сафари, то использовать websocket, иначе longpolling» и ещё немного ветвления на эту тему.

Так что опыт использования вебсокет в продакшене накопился приличный. И вот недавно случилось событие, которое сподвигло меня написать первую статью на Хабре.

После того, как игрушка была опубликована в социальной сети, я поправил все найденные критические/блокирующие баги и начал приводить всё в порядок в спокойном режиме. Я хочу обратить внимание на то, что этот вот пример — это вообще единственный в интернете гайд, который содержит серверный код, который можно вставить себе в код и использовать его. Ну вот набрать в поисковике «php websocket server» — попробуйте что-то найти, что можно себе поставить.

Внезапно я перечитываю указанную выше статью и в самом начале обнаруживаю ссылки на «phpdaemon» и «ratchet». Ну думаю, давай в спокойном режиме посмотрю на код тамошний. В PhpDeamon в недрах обработки WebSocket соединения небольшое, но безумно важное ветвление на протоколы WebSocket. И там прямо написано для одного case «Safari5 and many non-browser clients». Сказать, что я офигел — это ничего не сказать. Перед глазами пронеслись несколько сотен часов, тонны нервотрёпки и страдания, которые поставили под вопрос вообще проект. Я не поверил, решил проверить.

В течении ~15 часов я вытянул из PhpDeamon минимальный код, связанный с WebSocket (который работает во всех браузерах последней версии, а сам серверный код может работать под высокой нагрузкой) и его постараюсь опубликовать с объяснениями. Чтобы другие люди не испытали те мучения, через которые мне пришлось пройти. Да, кусок кода получился не маленький, но извините: WebSocket он на клиентской части очень простой, а на стороне сервера всё довольно объёмно (скажем отдельное «спасибо» разработчикам Сафари). Также в связи с тем, что область применения WebSocket — это в первую очередь игры, важен вопрос неблокирующего использования серверного сокета — это бонусная сложность, которая никак здесь не рассматривается, хотя и очень важна.

Тестовое приложение я хотел написать без объектов, чтобы было понятнее. Но, к сожалению, такой подход в данном примере расплодит много повторяющегося кода, поэтому пришлось добавить 1 класс и 3 его наследника. Остальное всё без объектов.

Для начала клиентсная часть
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
  <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
  <title>WebSocket test page</title>
</head>
<body onload="create();">
<script type="text/javascript">
  function create() {
    // Example
    ws = new WebSocket('ws://'+document.domain+':8081/');
    ws.onopen = function () {document.getElementById('log').innerHTML += 'WebSocket opened <br/>';}
    ws.onmessage = function (e) {document.getElementById('log').innerHTML += 'WebSocket message: '+e.data+' <br/>';}
    ws.onclose = function () {document.getElementById('log').innerHTML += 'WebSocket closed <br/>';}
  }
</script>
<button onclick="create();">Create WebSocket</button>
<button onclick="ws.send('ping');">Send ping</button>
<button onclick="ws.close();">Close WebSocket</button>
<div id="log" style="width:300px; height: 300px; border: 1px solid #999999; overflow:auto;"></div>
</body>
</html>


В моей игре мне пришлось использовать 3 сокет сервера. Для websocket, для worker`ов и для longpooling. В игре очень много математики, поэтому надо было делать вёркеры и выдавать им задачи на вычисления. Так вот к чему это. Что stream_select для них всех должен быть общий, иначе будут лаги или безумное использование процессора. Это знание тоже было получено взамен кучи истраченных нервов.

Основной цикл сервиса
$master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr);
if (!$master) die("$errstr ($errno)\n");
$sockets = array($master);
stream_set_blocking($master, false); // Относительно этой команды я не уверен, потому что мастер из сокетов читает только новые соединения, и для чтения используется "stream_socket_accept". Вариант, что весь сервис будет подвешен на несколько секунд из-за того, что клиент не торопится соединятся - категорически неприемлемо.
while (true) {
  $read = $sockets;
  $write = $except = array();
  if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) {
    var_dump('stream_select error');
    break;
    // Сделать выход из цикла, а не "die", потому что в продакшине скорей всего этот код будет выполняться как сервис и при команде "/etc/init.d/game restart" тут 100% будет этот case, так вот надо дать "pcntl" код нормально отработать и не мешать ему.
  }
  foreach ($read as $socket) {
    $index_socket = array_search($socket, $sockets);
    if ($index_socket == 0) {
      // Новое соединение
      continue;
    }
    // Тут будет обработка сообщений клиентов
  }
}


Соединение с новыми клиентами вполне себе стандартный код, но вот из-за того, что сокеты у нас в неблокирующем режиме, нужно написать кучу кода, который по кусочкам соберёт все входящие данные и, когда данных будет достаточно, обработает их, поймёт какой протокол надо использовать и переключится на использование этого протокола. Одна эта задача — уже гора кода, и в PhpDeamon нагородили много кода, который к WebSocket отношения не имеет (они же там 8 разных серверов умеют подымать). Удалось многое отрезать и упростить в этой теме. Оставил только то, что относится к WebSocket.

Файл урезанный <ws.php>
class ws {
  const MAX_BUFFER_SIZE = 1024 * 1024;

  protected $socket;

  /**
   * @var array _SERVER
   */
  public $server = [];

  protected $headers = [];

  protected $closed = false;
  protected $unparsed_data = '';
  private $current_header;
  private $unread_lines = array();

  protected $extensions = [];
  protected $extensionsCleanRegex = '/(?:^|\W)x-webkit-/iS';

  /**
   * @var integer Current state
   */
  protected $state = 0; // stream state of the connection (application protocol level)

  /**
   * Alias of STATE_STANDBY
   */
  const STATE_ROOT = 0;

  /**
   * Standby state (default state)
   */
  const STATE_STANDBY = 0;

  /**
   * State: first line
   */
  const STATE_FIRSTLINE  = 1;

  /**
   * State: headers
   */
  const STATE_HEADERS    = 2;

  /**
   * State: content
   */
  const STATE_CONTENT    = 3;

  /**
   * State: prehandshake
   */
  const STATE_PREHANDSHAKE = 5;

  /**
   * State: handshaked
   */
  const STATE_HANDSHAKED = 6;

  public function get_state() {
    return $this->state;
  }

  public function closed() {
    return $this->closed;
  }

  protected function close() {
    if ($this->closed) return;
    var_dump('self close');
    fclose($this->socket);
    $this->closed = true;
  }
  public function __construct($socket) {
    stream_set_blocking($socket, false);
    $this->socket = $socket;
  }

  private function read_line() {
    $lines = explode(PHP_EOL, $this->unparsed_data);
    $last_line = $lines[count($lines)-1];
    unset($lines[count($lines) - 1]);
    foreach ($lines as $line) {
      $this->unread_lines[] = $line;
    }
    $this->unparsed_data = $last_line;
    if (count($this->unread_lines) != 0) {
      return array_shift($this->unread_lines);
    } else {
      return null;
    }
  }
  public function on_receive_data() {
    if ($this->closed) return;
    $data = stream_socket_recvfrom($this->socket, MAX_BUFFER_SIZE);
    if (is_string($data)) {
      $this->unparsed_data .= $data;
    }
  }
  /**
   * Called when new data received.
   * @return void
   */
  public function on_read() {
    if ($this->closed) return;
    if ($this->state === self::STATE_STANDBY) {
      $this->state = self::STATE_FIRSTLINE;
    }
    if ($this->state === self::STATE_FIRSTLINE) {
      if (!$this->http_read_first_line()) {
        return;
      }
      $this->state = self::STATE_HEADERS;
    }

    if ($this->state === self::STATE_HEADERS) {
      if (!$this->http_read_headers()) {
        return;
      }
      if (!$this->http_process_headers()) {
        $this->close();
        return;
      }
      $this->state = self::STATE_CONTENT;
    }
    if ($this->state === self::STATE_CONTENT) {
      $this->state = self::STATE_PREHANDSHAKE;
    }
  }
  /**
   * Read first line of HTTP request
   * @return boolean|null Success
   */
  protected function http_read_first_line() {
    if (($l = $this->read_line()) === null) {
      return null;
    }
    $e = explode(' ', $l);
    $u = isset($e[1]) ? parse_url($e[1]) : false;
    if ($u === false) {
      $this->bad_request();
      return false;
    }
    if (!isset($u['path'])) {
      $u['path'] = null;
    }
    if (isset($u['host'])) {
      $this->server['HTTP_HOST'] = $u['host'];
    }
    $srv                       = & $this->server;
    $srv['REQUEST_METHOD']     = $e[0];
    $srv['REQUEST_TIME']       = time();
    $srv['REQUEST_TIME_FLOAT'] = microtime(true);
    $srv['REQUEST_URI']        = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : '');
    $srv['DOCUMENT_URI']       = $u['path'];
    $srv['PHP_SELF']           = $u['path'];
    $srv['QUERY_STRING']       = isset($u['query']) ? $u['query'] : null;
    $srv['SCRIPT_NAME']        = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/';
    $srv['SERVER_PROTOCOL']    = isset($e[2]) ? $e[2] : 'HTTP/1.1';
    $srv['REMOTE_ADDR']        = null;
    $srv['REMOTE_PORT']        = null;
    return true;
  }
  /**
   * Read headers line-by-line
   * @return boolean|null Success
   */
  protected function http_read_headers() {
    while (($l = $this->read_line()) !== null) {
      if ($l === '') {
        return true;
      }
      $e = explode(': ', $l);
      if (isset($e[1])) {
        $this->current_header                = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_']));
        $this->server[$this->current_header] = $e[1];
      } elseif (($e[0][0] === "\t" || $e[0][0] === "\x20") && $this->current_header) {
        // multiline header continued
        $this->server[$this->current_header] .= $e[0];
      } else {
        // whatever client speaks is not HTTP anymore
        $this->bad_request();
        return false;
      }
    }
  }
  /**
   * Process headers
   * @return bool
   */
  protected function http_process_headers() {
    $this->state = self::STATE_PREHANDSHAKE;
    if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) {
      $str              = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']);
      $str              = preg_replace($this->extensionsCleanRegex, '', $str);
      $this->extensions = explode(', ', $str);
    }
    if (!isset($this->server['HTTP_CONNECTION'])
      || (!preg_match('~(?:^|\W)Upgrade(?:\W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade")
      || !isset($this->server['HTTP_UPGRADE'])
      || (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important
    ) {
      $this->close();
      return false;
    }
    if (isset($this->server['HTTP_COOKIE'])) {
      self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie);
    }
    if (isset($this->server['QUERY_STRING'])) {
      self::parse_str($this->server['QUERY_STRING'], $this->get);
    }
    // ----------------------------------------------------------
    // Protocol discovery, based on HTTP headers...
    // ----------------------------------------------------------
    if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI
      if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14)
        $this->switch_to_protocol('v13');
      } elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol
        $this->switch_to_protocol('v13');
      } else {
        error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr
        $this->close();
        return false;
      }
    } elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
      $this->switch_to_protocol('ve');
    } else { // Defaulting to HIXIE (Safari5 and many non-browser clients...)
      $this->switch_to_protocol('v0');
    }
    // ----------------------------------------------------------
    // End of protocol discovery
    // ----------------------------------------------------------
    return true;
  }
  private function switch_to_protocol($protocol) {
    $class = 'ws_'.$protocol;
    $this->new_instance = new $class($this->socket);
    $this->new_instance->state = $this->state;
    $this->new_instance->unparsed_data = $this->unparsed_data;
    $this->new_instance->server = $this->server;
  }
  /**
   * Send Bad request
   * @return void
   */
  public function bad_request() {
    $this->write("400 Bad Request\r\n\r\n<html><head><title>400 Bad Request</title></head><body bgcolor=\"white\"><center><h1>400 Bad Request</h1></center></body></html>");
    $this->close();
  }
  /**
   * Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX
   * @param  string  $s      String to parse
   * @param  array   &$var   Reference to the resulting array
   * @param  boolean $header Header-style string
   * @return void
   */
  public static function parse_str($s, &$var, $header = false)
  {
    static $cb;
    if ($cb === null) {
      $cb = function ($m) {
        return urlencode(html_entity_decode('&#' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8'));
      };
    }
    if ($header) {
      $s = strtr($s, self::$hvaltr);
    }
    if (
      (stripos($s, '%u') !== false)
      && preg_match('~(%u[a-f\d]{4}|%[c-f][a-f\d](?!%[89a-f][a-f\d]))~is', $s, $m)
    ) {
      $s = preg_replace_callback('~%(u[a-f\d]{4}|[a-f\d]{2})~i', $cb, $s);
    }
    parse_str($s, $var);
  }
  /**
   * Send data to the connection. Note that it just writes to buffer that flushes at every baseloop
   * @param  string  $data Data to send
   * @return boolean       Success
   */
  public function write($data) {
    if ($this->closed) return false;
    return stream_socket_sendto($this->socket, $data) == 0;
  }
}


Смысл этого класса в таком урезанном виде — в конструкторе установить неблокирующий режим для соединения с клиентом. Далее в основном цикле, каждый раз, когда приходят данные — сразу их прочитать и положить (дополнить) в «unparsed_data» переменную (это метод on_receive_data). Важно понимать, что если мы выйдем за размеры MAX_BUFFER_SIZE, то вообще ничего страшного не случится. Можно в итоговом примере, что тут будет, поставить его значение, скажем, «5» и убедится, что всё по-прежнему работает. Просто данные из буфера на первом шаге будут проигнорированы — они ведь неполные будут, и со второго или пятого или сотого захода наберутся, наконец, все принятые данные и будут обработаны. При этом stream_select в основном цикле ждать не будет даже микросекунды, пока все данные не будут извлечены. Константу надо подобрать такую, чтобы 95% ожидаемых данных читались целиком.
Далее в основном цикле (после получения очередной порции данных) мы пробуем накопленные данные обработать (это метод on_read). В классе «ws» метод «on_read» состоит по сути из трёх шагов: «читаем первую строку и готовим переменные окружения», «читаем все заголовки», «обрабатываем все заголовки». Первые 2 пояснять не надо, но написаны они довольно объёмно потому, что мы в неблокирующем режиме и надо быть готовым к тому, что данные оборваны в любом месте. Обработка заголовков сначала проверяет формат запроса правильный или нет, а потом по заголовкам определяет протокол, по которому будет общаться с клиентом. В итоге должны дёрнуть метод switch_to_protocol. Этот метод внутри себя сформирует экземпляр класса «ws_<протокол>» и подготовит его для отдачи в основной цикл.

В основном цикле далее надо собственно проверить: а не надо ли подменить объект (если кто-то может предложить реализацию этого места лучше — всегда пожалуйста).

Далее в основном цикле надо поставить проверку: а не закрыт ли сокет. Если закрыт, то очистить память (об этом дальнее в следующем блоке).

Теперь полная версия файла <deamon.php>
require('ws.php');
require('ws_v0.php');
require('ws_v13.php');
require('ws_ve.php');
$master = stream_socket_server("tcp://127.0.0.1:8081", $errno, $errstr);
if (!$master) die("$errstr ($errno)\n");
$sockets = array($master);
/**
 * @var ws[] $connections
 */
$connections = array();
stream_set_blocking($master, false);
/**
 * @param ws $connection
 * @param $data
 * @param $type
 */
$my_callback = function($connection, $data, $type) {
  var_dump('my ws data: ['.$data.'/'.$type.']');
  $connection->send_frame('test '.time());
};
while (true) {
  $read = $sockets;
  $write = $except = array();
  if (($num_changed_streams = stream_select($read, $write, $except, 0, 1000000)) === false) {
    var_dump('stream_select error');
    break;
  }
  foreach ($read as $socket) {
    $index_socket = array_search($socket, $sockets);
    if ($index_socket == 0) {
      // Новое соединение
      if ($socket_new = stream_socket_accept($master, -1)) {
        $connection = new ws($socket_new, $my_callback);
        $sockets[] = $socket_new;
        $index_new_socket = array_search($socket_new, $sockets);
        $connections[$index_new_socket] = &$connection;
        $index_socket = $index_new_socket;
      } else {
        // Я так и не понял что в этом случае надо делать
        error_log('stream_socket_accept');
        var_dump('error stream_socket_accept');
        continue;
      }
    }
    $connection = &$connections[$index_socket];
    $connection->on_receive_data();
    $connection->on_read();
    if ($connection->get_state() == ws::STATE_PREHANDSHAKE) {
      $connection = $connection->get_new_instance();
      $connections[$index_socket] = &$connection;
      $connection->on_read();
    }
    if ($connection->closed()) {
      unset($sockets[$index_socket]);
      unset($connections[$index_socket]);
      unset($connection);
      var_dump('close '.$index_socket);
    }
  }
}


Тут добавлен "$my_callback" — это наш custom обработчик сообщений от клиента. Разумеется в продакшине можно завернуть это всё в объекты всякие, а тут чтобы было понятнее просто переменная-функция. О ней чуть позже подробнее.

Реализована обработка нового соединения и реализовано основное тело цикла, о котором я чуть выше писал.

Я хочу обратить внимание на код сервера тут. Что если прочтённые данные из сокета — это пустая строка (да, разумеется я видел там в update проверку на пустую строку), то сокет надо закрыть. Ох, я даже не знаю, сколько этот момет попил мне кровушки и скольких пользователей я потерял. Внезапнейшим образом Сафари отправляет пустую строку и считает это нормой, а этот код берёт и закрывает соединение пользователю. Яндекс-браузер иногда ведёт себя так же. Уж не знаю почему, но в этом случае для Сафари WebSocket остаётся зависшим, то есть он не закрывается, не открывается — просто висит и всё. Вы уже заметили, что я неравнодушен к этому волшебному браузеру? Мне вспоминается, как я верстал под IE6 — примерно такие же ощущения.

Теперь о том, зачем я использую array_search и синхронизирую массив $sockets и массив $connections. Дело в том, что stream_select жизненно необходим чистый массив $sockets и никак иначе. Но как-то надо же связать конкретный сокет из массива $sockets с объектом «ws». Перепробовал кучу вариантов — в итоге остановился на таком варианте, что есть 2 массива, которые постоянно синхронизированы по ключам. В одном массиве неоходимые чистые сокеты для stream_select, а во втором экземпляры класса «ws» или его наследники. Если кто-то может предложить это место лучше — предлагайте.

Ещё отдельно надо отметить случай, когда stream_socket_accept зафэйлился. Я так понимаю, теоретически это может быть только в том случае, если мастер сокет у нас в неблокирующем режиме, и приехало недостаточно данных для соединения клиента. Поэтому просто ничего не делаем.

Полная версия файла <ws.php>
class ws {
  private static $hvaltr = ['; ' => '&', ';' => '&', ' ' => '%20'];

  const maxAllowedPacket = 1024 * 1024 * 1024;
  const MAX_BUFFER_SIZE = 1024 * 1024;

  protected $socket;

  /**
   * @var array _SERVER
   */
  public $server = [];

  protected $on_frame_user = null;

  protected $handshaked = false;

  protected $headers = [];
  protected $headers_sent = false;

  protected $closed = false;
  protected $unparsed_data = '';
  private $current_header;
  private $unread_lines = array();
  /**
   * @var ws|null
   */
  private $new_instance = null;

  protected $extensions = [];
  protected $extensionsCleanRegex = '/(?:^|\W)x-webkit-/iS';

  /**
   * @var integer Current state
   */
  protected $state = 0; // stream state of the connection (application protocol level)

  /**
   * Alias of STATE_STANDBY
   */
  const STATE_ROOT = 0;

  /**
   * Standby state (default state)
   */
  const STATE_STANDBY = 0;

  /**
   * State: first line
   */
  const STATE_FIRSTLINE  = 1;

  /**
   * State: headers
   */
  const STATE_HEADERS    = 2;

  /**
   * State: content
   */
  const STATE_CONTENT    = 3;

  /**
   * State: prehandshake
   */
  const STATE_PREHANDSHAKE = 5;

  /**
   * State: handshaked
   */
  const STATE_HANDSHAKED = 6;

  public function get_state() {
    return $this->state;
  }

  public function get_new_instance() {
    return $this->new_instance;
  }

  public function closed() {
    return $this->closed;
  }

  protected function close() {
    if ($this->closed) return;
    var_dump('self close');
    fclose($this->socket);
    $this->closed = true;
  }
  public function __construct($socket, $on_frame_user = null) {
    stream_set_blocking($socket, false);
    $this->socket = $socket;
    $this->on_frame_user = $on_frame_user;
  }

  private function read_line() {
    $lines = explode(PHP_EOL, $this->unparsed_data);
    $last_line = $lines[count($lines)-1];
    unset($lines[count($lines) - 1]);
    foreach ($lines as $line) {
      $this->unread_lines[] = $line;
    }
    $this->unparsed_data = $last_line;
    if (count($this->unread_lines) != 0) {
      return array_shift($this->unread_lines);
    } else {
      return null;
    }
  }
  public function on_receive_data() {
    if ($this->closed) return;
    $data = stream_socket_recvfrom($this->socket, self::MAX_BUFFER_SIZE);
    if (is_string($data)) {
      $this->unparsed_data .= $data;
    }
  }
  /**
   * Called when new data received.
   * @return void
   */
  public function on_read() {
    if ($this->closed) return;
    if ($this->state === self::STATE_STANDBY) {
      $this->state = self::STATE_FIRSTLINE;
    }
    if ($this->state === self::STATE_FIRSTLINE) {
      if (!$this->http_read_first_line()) {
        return;
      }
      $this->state = self::STATE_HEADERS;
    }

    if ($this->state === self::STATE_HEADERS) {
      if (!$this->http_read_headers()) {
        return;
      }
      if (!$this->http_process_headers()) {
        $this->close();
        return;
      }
      $this->state = self::STATE_CONTENT;
    }
    if ($this->state === self::STATE_CONTENT) {
      $this->state = self::STATE_PREHANDSHAKE;
    }
  }
  /**
   * Read first line of HTTP request
   * @return boolean|null Success
   */
  protected function http_read_first_line() {
    if (($l = $this->read_line()) === null) {
      return null;
    }
    $e = explode(' ', $l);
    $u = isset($e[1]) ? parse_url($e[1]) : false;
    if ($u === false) {
      $this->bad_request();
      return false;
    }
    if (!isset($u['path'])) {
      $u['path'] = null;
    }
    if (isset($u['host'])) {
      $this->server['HTTP_HOST'] = $u['host'];
    }
    $address = explode(':', stream_socket_get_name($this->socket, true)); //получаем адрес клиента
    $srv                       = & $this->server;
    $srv['REQUEST_METHOD']     = $e[0];
    $srv['REQUEST_TIME']       = time();
    $srv['REQUEST_TIME_FLOAT'] = microtime(true);
    $srv['REQUEST_URI']        = $u['path'] . (isset($u['query']) ? '?' . $u['query'] : '');
    $srv['DOCUMENT_URI']       = $u['path'];
    $srv['PHP_SELF']           = $u['path'];
    $srv['QUERY_STRING']       = isset($u['query']) ? $u['query'] : null;
    $srv['SCRIPT_NAME']        = $srv['DOCUMENT_URI'] = isset($u['path']) ? $u['path'] : '/';
    $srv['SERVER_PROTOCOL']    = isset($e[2]) ? $e[2] : 'HTTP/1.1';
    $srv['REMOTE_ADDR']        = $address[0];
    $srv['REMOTE_PORT']        = $address[1];
    return true;
  }
  /**
   * Read headers line-by-line
   * @return boolean|null Success
   */
  protected function http_read_headers() {
    while (($l = $this->read_line()) !== null) {
      if ($l === '') {
        return true;
      }
      $e = explode(': ', $l);
      if (isset($e[1])) {
        $this->current_header                = 'HTTP_' . strtoupper(strtr($e[0], ['-' => '_']));
        $this->server[$this->current_header] = $e[1];
      } elseif (($e[0][0] === "\t" || $e[0][0] === "\x20") && $this->current_header) {
        // multiline header continued
        $this->server[$this->current_header] .= $e[0];
      } else {
        // whatever client speaks is not HTTP anymore
        $this->bad_request();
        return false;
      }
    }
  }
  /**
   * Process headers
   * @return bool
   */
  protected function http_process_headers() {
    $this->state = self::STATE_PREHANDSHAKE;
    if (isset($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS'])) {
      $str              = strtolower($this->server['HTTP_SEC_WEBSOCKET_EXTENSIONS']);
      $str              = preg_replace($this->extensionsCleanRegex, '', $str);
      $this->extensions = explode(', ', $str);
    }
    if (!isset($this->server['HTTP_CONNECTION'])
      || (!preg_match('~(?:^|\W)Upgrade(?:\W|$)~i', $this->server['HTTP_CONNECTION'])) // "Upgrade" is not always alone (ie. "Connection: Keep-alive, Upgrade")
      || !isset($this->server['HTTP_UPGRADE'])
      || (strtolower($this->server['HTTP_UPGRADE']) !== 'websocket') // Lowercase comparison iss important
    ) {
      $this->close();
      return false;
    }
    /*
    if (isset($this->server['HTTP_COOKIE'])) {
      self::parse_str(strtr($this->server['HTTP_COOKIE'], self::$hvaltr), $this->cookie);
    }
    if (isset($this->server['QUERY_STRING'])) {
      self::parse_str($this->server['QUERY_STRING'], $this->get);
    }
    */
    // ----------------------------------------------------------
    // Protocol discovery, based on HTTP headers...
    // ----------------------------------------------------------
    if (isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) { // HYBI
      if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '8') { // Version 8 (FF7, Chrome14)
        $this->switch_to_protocol('v13');
      } elseif ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] === '13') { // newest protocol
        $this->switch_to_protocol('v13');
      } else {
        error_log(get_class($this) . '::' . __METHOD__ . " : Websocket protocol version " . $this->server['HTTP_SEC_WEBSOCKET_VERSION'] . ' is not yet supported for client "addr"'); // $this->addr
        $this->close();
        return false;
      }
    } elseif (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
      $this->switch_to_protocol('ve');
    } else { // Defaulting to HIXIE (Safari5 and many non-browser clients...)
      $this->switch_to_protocol('v0');
    }
    // ----------------------------------------------------------
    // End of protocol discovery
    // ----------------------------------------------------------
    return true;
  }
  private function switch_to_protocol($protocol) {
    $class = 'ws_'.$protocol;
    $this->new_instance = new $class($this->socket);
    $this->new_instance->state = $this->state;
    $this->new_instance->unparsed_data = $this->unparsed_data;
    $this->new_instance->server = $this->server;
    $this->new_instance->on_frame_user = $this->on_frame_user;
  }
  /**
   * Send Bad request
   * @return void
   */
  public function bad_request() {
    $this->write("400 Bad Request\r\n\r\n<html><head><title>400 Bad Request</title></head><body bgcolor=\"white\"><center><h1>400 Bad Request</h1></center></body></html>");
    $this->close();
  }
  /**
   * Replacement for default parse_str(), it supoorts UCS-2 like this: %uXXXX
   * @param  string  $s      String to parse
   * @param  array   &$var   Reference to the resulting array
   * @param  boolean $header Header-style string
   * @return void
   */
  public static function parse_str($s, &$var, $header = false) {
    static $cb;
    if ($cb === null) {
      $cb = function ($m) {
        return urlencode(html_entity_decode('&#' . hexdec($m[1]) . ';', ENT_NOQUOTES, 'utf-8'));
      };
    }
    if ($header) {
      $s = strtr($s, self::$hvaltr);
    }
    if (
      (stripos($s, '%u') !== false)
      && preg_match('~(%u[a-f\d]{4}|%[c-f][a-f\d](?!%[89a-f][a-f\d]))~is', $s, $m)
    ) {
      $s = preg_replace_callback('~%(u[a-f\d]{4}|[a-f\d]{2})~i', $cb, $s);
    }
    parse_str($s, $var);
  }
  /**
   * Send data to the connection. Note that it just writes to buffer that flushes at every baseloop
   * @param  string  $data Data to send
   * @return boolean       Success
   */
  public function write($data) {
    if ($this->closed) return false;
    return stream_socket_sendto($this->socket, $data) == 0;
  }

  /**
   * Будте любезны в отнаследованном классе реализовать этот метод
   * @return bool
   */
  protected function send_handshake_reply() {
    return false;
  }
  /**
   * Called when we're going to handshake.
   * @return boolean               Handshake status
   */
  public function handshake() {
    $extra_headers = '';
    foreach ($this->headers as $k => $line) {
      if ($k !== 'STATUS') {
        $extra_headers .= $line . "\r\n";
      }
    }

    if (!$this->send_handshake_reply($extra_headers)) {
      error_log(get_class($this) . '::' . __METHOD__ . ' : Handshake protocol failure for client ""'); // $this->addr
      $this->close();
      return false;
    }

    $this->handshaked = true;
    $this->headers_sent = true;
    $this->state = static::STATE_HANDSHAKED;
    return true;
  }
  /**
   * Read from buffer without draining
   * @param integer $n Number of bytes to read
   * @param integer $o Offset
   * @return string|false
   */
  public function look($n, $o = 0) {
    if (strlen($this->unparsed_data) <= $o) {
      return '';
    }
    return substr($this->unparsed_data, $o, $n);
  }
  /**
   * Convert bytes into integer
   * @param  string  $str Bytes
   * @param  boolean $l   Little endian? Default is false
   * @return integer
   */
  public static function bytes2int($str, $l = false) {
    if ($l) {
      $str = strrev($str);
    }
    $dec = 0;
    $len = strlen($str);
    for ($i = 0; $i < $len; ++$i) {
      $dec += ord(substr($str, $i, 1)) * pow(0x100, $len - $i - 1);
    }
    return $dec;
  }
  /**
   * Drains buffer
   * @param  integer $n Numbers of bytes to drain
   * @return boolean    Success
   */
  public function drain($n) {
    $ret = substr($this->unparsed_data, 0, $n);
    $this->unparsed_data = substr($this->unparsed_data, $n);
    return $ret;
  }
  /**
   * Read data from the connection's buffer
   * @param  integer      $n Max. number of bytes to read
   * @return string|false    Readed data
   */
  public function read($n) {
    if ($n <= 0) {
      return '';
    }
    $read = $this->drain($n);
    if ($read === '') {
      return false;
    }
    return $read;
  }
  /**
   * Reads all data from the connection's buffer
   * @return string Readed data
   */
  public function read_unlimited() {
    $ret = $this->unparsed_data;
    $this->unparsed_data = '';
    return $ret;
  }
  /**
   * Searches first occurence of the string in input buffer
   * @param  string  $what  Needle
   * @param  integer $start Offset start
   * @param  integer $end   Offset end
   * @return integer        Position
   */
  public function search($what, $start = 0, $end = -1) {
    return strpos($this->unparsed_data, $what, $start);
  }
  /**
   * Called when new frame received.
   * @param  string $data Frame's data.
   * @param  string $type Frame's type ("STRING" OR "BINARY").
   * @return boolean      Success.
   */
  public function on_frame($data, $type) {
    if (is_callable($this->on_frame_user)) {
      call_user_func($this->on_frame_user, $this, $data, $type);
    }
    return true;
  }
  public function send_frame($data, $type = null, $cb = null) {
    return false;
  }
  /**
   * Get real frame type identificator
   * @param $type
   * @return integer
   */
  public function get_frame_type($type) {
    if (is_int($type)) {
      return $type;
    }
    if ($type === null) {
      $type = 'STRING';
    }
    $frametype = @constant(get_class($this) . '::' . $type);
    if ($frametype === null) {
      error_log(__METHOD__ . ' : Undefined frametype "' . $type . '"');
    }
    return $frametype;
  }
}


По сути тут добавлены 3 вещи: «соединение с клиентом на уровне веб сокета», «получение сообщения от клиента», «отправка сообщения клиенту».

Для начала немного теории и терминологии. «Handshake» — это с точки зрения веб сокетов процедура установления соединения поверх http. Надо ведь решить кучу вопросов: как пробиться сквозь гущу прокси и кэшэй, как защитится от злых хакеров. И термин «frame» — это кусок данных в расшифрованном виде, это сообщение от клиента или сообщение для клиента. Возможно, об этом стоило написать в начале статьи, но из-за этих вот «frame» делать сокет сервер в блокирующем режиме сокетов имхо бессмысленно. То, как сделан этот момент вот тут — это лишило меня сна не на одну ночь. В той статье не рассматривается вариант, что frame приехал не полностью или их приехало сразу два. И то и то, между прочим, вполне себе типичная ситуация, как показали логи игры.

Теперь к деталям.

Соединение с клиентом на уровне веб сокета — предполагается, что протокол (например, ws_v0) перекроет метод «on_read» и внутри себя дёрнет «handshake», когда данных будет достаточно. Далее кусок «handshake» в родителе. Далее дёргается метод «send_handshake_reply», который должен быть реализован в протоколе. Этот вот «send_handshake_reply» должен такое ответить клиенту, чтобы тот понял, что «соединение установлено», нормальным браузерам — нормальный ответ, а для Сафари — особый ответ.

Получение сообщения от клиента. Обращаю внимание, что глупые клиенты могут реализовать такой вариант, что соединение не установлено, а сообщение от пользователя уже пришло. Поэтому надо бережно относится к «unparsed_data» переменной. В каждом протоколе метод «on_read» должен понять размер передаваемого frame, убедиться, что frame целиком приехал, расшифровать приехавший frame в сообщение пользователя. В каждом протоколе это делается очень по-разному и очень кучеряво (мы ж не знаем, приехал frame полностью или нет, плюс нельзя откусить ни байта следующего frame). Далее внутри «on_read», когда данные клиента получены и расшифрованы и определён их тип (да-да и такое предусмотрено), дёргаем метод «on_frame», который внутри класса «ws», тот, в свою очередь, дёрнет custom callback (функция $my_callback, перед основным циклом которая). И в итоге $my_callback получает сообщение от клиента.

Отправка сообщения клиенту. Просто дёргается метод «send_frame», который должен быть реализован внутри протокола. Тут просто шифруем сообщение и отправляем пользователю. Разные протоколы шифруют по-разному.

Теперь прилагаю 3 протокола «v13», «v0», «ve»:

Файл <ws_v13.php>
class ws_v13 extends ws {
  const CONTINUATION = 0;
  const STRING       = 0x1;
  const BINARY       = 0x2;
  const CONNCLOSE    = 0x8;
  const PING         = 0x9;
  const PONG         = 0xA;
  protected static $opcodes = [
    0   => 'CONTINUATION',
    0x1 => 'STRING',
    0x2 => 'BINARY',
    0x8 => 'CONNCLOSE',
    0x9 => 'PING',
    0xA => 'PONG',
  ];
  protected $outgoingCompression = 0;

  protected $framebuf = '';

  /**
   * Apply mask
   * @param $data
   * @param string|false $mask
   * @return mixed
   */
  public function mask($data, $mask) {
    for ($i = 0, $l = strlen($data), $ml = strlen($mask); $i < $l; $i++) {
      $data[$i] = $data[$i] ^ $mask[$i % $ml];
    }
    return $data;
  }

  /**
   * Sends a frame.
   * @param  string   $data  Frame's data.
   * @param  string   $type  Frame's type. ("STRING" OR "BINARY")
   * @param  callable $cb    Optional. Callback called when the frame is received by client.
   * @callback $cb ( )
   * @return boolean         Success.
   */
  public function send_frame($data, $type = null, $cb = null) {
    if (!$this->handshaked) {
      return false;
    }

    if ($this->closed && $type !== 'CONNCLOSE') {
      return false;
    }

    /*if (in_array($type, ['STRING', 'BINARY']) && ($this->outgoingCompression > 0) && in_array('deflate-frame', $this->extensions)) {
        //$data = gzcompress($data, $this->outgoingCompression);
        //$rsv1 = 1;
    }*/

    $fin = 1;
    $rsv1 = 0;
    $rsv2 = 0;
    $rsv3 = 0;
    $this->write(chr(bindec($fin . $rsv1 . $rsv2 . $rsv3 . str_pad(decbin($this->get_frame_type($type)), 4, '0', STR_PAD_LEFT))));
    $dataLength  = strlen($data);
    $isMasked    = false;
    $isMaskedInt = $isMasked ? 128 : 0;
    if ($dataLength <= 125) {
      $this->write(chr($dataLength + $isMaskedInt));
    } elseif ($dataLength <= 65535) {
      $this->write(chr(126 + $isMaskedInt) . // 126 + 128
        chr($dataLength >> 8) .
        chr($dataLength & 0xFF));
    } else {
      $this->write(chr(127 + $isMaskedInt) . // 127 + 128
        chr($dataLength >> 56) .
        chr($dataLength >> 48) .
        chr($dataLength >> 40) .
        chr($dataLength >> 32) .
        chr($dataLength >> 24) .
        chr($dataLength >> 16) .
        chr($dataLength >> 8) .
        chr($dataLength & 0xFF));
    }
    if ($isMasked) {
      $mask    = chr(mt_rand(0, 0xFF)) .
        chr(mt_rand(0, 0xFF)) .
        chr(mt_rand(0, 0xFF)) .
        chr(mt_rand(0, 0xFF));
      $this->write($mask . $this->mask($data, $mask));
    } else {
      $this->write($data);
    }
    if ($cb !== null) {
      $cb();
    }
    return true;
  }

  /**
   * Sends a handshake message reply
   * @param string Received data (no use in this class)
   * @return boolean OK?
   */
  public function send_handshake_reply($extraHeaders = '') {
    if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY']) || !isset($this->server['HTTP_SEC_WEBSOCKET_VERSION'])) {
      return false;
    }
    if ($this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '13' && $this->server['HTTP_SEC_WEBSOCKET_VERSION'] !== '8') {
      return false;
    }

    if (isset($this->server['HTTP_ORIGIN'])) {
      $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = $this->server['HTTP_ORIGIN'];
    }
    if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
      $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = '';
    }
    $this->write("HTTP/1.1 101 Switching Protocols\r\n"
      . "Upgrade: WebSocket\r\n"
      . "Connection: Upgrade\r\n"
      . "Date: " . date('r') . "\r\n"
      . "Sec-WebSocket-Origin: " . $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] . "\r\n"
      . "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n"
      . "Sec-WebSocket-Accept: " . base64_encode(sha1(trim($this->server['HTTP_SEC_WEBSOCKET_KEY']) . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true)) . "\r\n"
    );
    if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
      $this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n");
    }

    $this->write($extraHeaders."\r\n");

    return true;
  }
  /**
   * Called when new data received
   * @see http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10#page-16
   * @return void
   */
  public function on_read() {
    if ($this->closed) return;
    if ($this->state === self::STATE_PREHANDSHAKE) {
      if (!$this->handshake()) {
        return;
      }
    }
    if ($this->state === self::STATE_HANDSHAKED) {
      while (($buflen = strlen($this->unparsed_data)) >= 2) {
        $first = ord($this->look(1)); // first byte integer (fin, opcode)
        $firstBits = decbin($first);
        $opcode = (int)bindec(substr($firstBits, 4, 4));
        if ($opcode === 0x8) { // CLOSE
          $this->close();
          return;
        }
        $opcodeName = isset(static::$opcodes[$opcode]) ? static::$opcodes[$opcode] : false;
        if (!$opcodeName) {
          error_log(get_class($this) . ': Undefined opcode ' . $opcode);
          $this->close();
          return;
        }
        $second = ord($this->look(1, 1)); // second byte integer (masked, payload length)
        $fin = (bool)($first >> 7);
        $isMasked = (bool)($second >> 7);
        $dataLength = $second & 0x7f;
        $p = 2;
        if ($dataLength === 0x7e) { // 2 bytes-length
          if ($buflen < $p + 2) {
            return; // not enough data yet
          }
          $dataLength = self::bytes2int($this->look(2, $p), false);
          $p += 2;
        } elseif ($dataLength === 0x7f) { // 8 bytes-length
          if ($buflen < $p + 8) {
            return; // not enough data yet
          }
          $dataLength = self::bytes2int($this->look(8, $p));
          $p += 8;
        }
        if (self::maxAllowedPacket <= $dataLength) {
          // Too big packet
          $this->close();
          return;
        }
        if ($isMasked) {
          if ($buflen < $p + 4) {
            return; // not enough data yet
          }
          $mask = $this->look(4, $p);
          $p += 4;
        }
        if ($buflen < $p + $dataLength) {
          return; // not enough data yet
        }
        $this->drain($p);
        $data = $this->read($dataLength);
        if ($isMasked) {
          $data = $this->mask($data, $mask);
        }
        //Daemon::log(Debug::dump(array('ext' => $this->extensions, 'rsv1' => $firstBits[1], 'data' => Debug::exportBytes($data))));
        /*if ($firstBits[1] && in_array('deflate-frame', $this->extensions)) { // deflate frame
            $data = gzuncompress($data, $this->pool->maxAllowedPacket);
        }*/
        if (!$fin) {
          $this->framebuf .= $data;
        } else {
          $this->on_frame($this->framebuf . $data, $opcodeName);
          $this->framebuf = '';
        }
      }
    }
  }
}


Файл <ws_v0.php>
class ws_v0 extends ws {
  const STRING = 0x00;
  const BINARY = 0x80;

  protected $key;

  /**
   * Sends a handshake message reply
   * @param string Received data (no use in this class)
   * @return boolean OK?
   */
  public function send_handshake_reply($extraHeaders = '') {
    if (!isset($this->server['HTTP_SEC_WEBSOCKET_KEY1']) || !isset($this->server['HTTP_SEC_WEBSOCKET_KEY2'])) {
      return false;
    }
    $final_key = $this->_computeFinalKey($this->server['HTTP_SEC_WEBSOCKET_KEY1'], $this->server['HTTP_SEC_WEBSOCKET_KEY2'], $this->key);
    $this->key = null;

    if (!$final_key) {
      return false;
    }

    if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
      $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = '';
    }

    $this->write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
      . "Upgrade: WebSocket\r\n"
      . "Connection: Upgrade\r\n"
      . "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "\r\n"
      . "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n");
    if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
      $this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n");
    }
    $this->write($extraHeaders . "\r\n" . $final_key);
    return true;
  }

  /**
   * Computes final key for Sec-WebSocket.
   * @param string Key1
   * @param string Key2
   * @param string Data
   * @return string Result
   */
  protected function _computeFinalKey($key1, $key2, $data) {
    if (strlen($data) < 8) {
      error_log(get_class($this) . '::' . __METHOD__ . ' : Invalid handshake data for client ""'); // $this->addr
      return false;
    }
    return md5($this->_computeKey($key1) . $this->_computeKey($key2) . substr($data, 0, 8), true);
  }

  /**
   * Computes key for Sec-WebSocket.
   * @param string Key
   * @return string Result
   */
  protected function _computeKey($key) {
    $spaces = 0;
    $digits = '';

    for ($i = 0, $s = strlen($key); $i < $s; ++$i) {
      $c = substr($key, $i, 1);

      if ($c === "\x20") {
        ++$spaces;
      } elseif (ctype_digit($c)) {
        $digits .= $c;
      }
    }

    if ($spaces > 0) {
      $result = (float)floor($digits / $spaces);
    } else {
      $result = (float)$digits;
    }

    return pack('N', $result);
  }

  /**
   * Sends a frame.
   * @param  string   $data  Frame's data.
   * @param  string   $type  Frame's type. ("STRING" OR "BINARY")
   * @param  callable $cb    Optional. Callback called when the frame is received by client.
   * @callback $cb ( )
   * @return boolean         Success.
   */
  public function send_frame($data, $type = null, $cb = null) {
    if (!$this->handshaked) {
      return false;
    }

    if ($this->closed && $type !== 'CONNCLOSE') {
      return false;
    }
    if ($type === 'CONNCLOSE') {
      if ($cb !== null) {
        $cb($this);
        return true;
      }
    }

    $type = $this->get_frame_type($type);
    // Binary
    if (($type & self::BINARY) === self::BINARY) {
      $n   = strlen($data);
      $len = '';
      $pos = 0;

      char:

      ++$pos;
      $c = $n >> 0 & 0x7F;
      $n >>= 7;

      if ($pos !== 1) {
        $c += 0x80;
      }

      if ($c !== 0x80) {
        $len = chr($c) . $len;
        goto char;
      };

      $this->write(chr(self::BINARY) . $len . $data);
    }
    // String
    else {
      $this->write(chr(self::STRING) . $data . "\xFF");
    }
    if ($cb !== null) {
      $cb();
    }
    return true;
  }

  /**
   * Called when new data received
   * @return void
   */
  public function on_read() {
    if ($this->state === self::STATE_PREHANDSHAKE) {
      if (strlen($this->unparsed_data) < 8) {
        return;
      }
      $this->key = $this->read_unlimited();
      $this->handshake();
    }
    if ($this->state === self::STATE_HANDSHAKED) {
      while (($buflen = strlen($this->unparsed_data)) >= 2) {
        $hdr = $this->look(10);
        $frametype = ord(substr($hdr, 0, 1));
        if (($frametype & 0x80) === 0x80) {
          $len = 0;
          $i = 0;
          do {
            if ($buflen < $i + 1) {
              // not enough data yet
              return;
            }
            $b = ord(substr($hdr, ++$i, 1));
            $n = $b & 0x7F;
            $len *= 0x80;
            $len += $n;
          } while ($b > 0x80);

          if (self::maxAllowedPacket <= $len) {
            // Too big packet
            $this->close();
            return;
          }

          if ($buflen < $len + $i + 1) {
            // not enough data yet
            return;
          }
          $this->drain($i + 1);
          $this->on_frame($this->read($len), 'BINARY');
        } else {
          if (($p = $this->search("\xFF")) !== false) {
            if (self::maxAllowedPacket <= $p - 1) {
              // Too big packet
              $this->close();
              return;
            }
            $this->drain(1);
            $data = $this->read($p);
            $this->drain(1);
            $this->on_frame($data, 'STRING');
          } else {
            if (self::maxAllowedPacket < $buflen - 1) {
              // Too big packet
              $this->close();
              return;
            }
            // not enough data yet
            return;
          }
        }
      }
    }
  }
}


Файл <ws_ve.php>
class ws_ve extends ws {
  const STRING = 0x00;
  const BINARY = 0x80;

  /**
   * Sends a handshake message reply
   * @param string Received data (no use in this class)
   * @return boolean OK?
   */
  public function send_handshake_reply($extraHeaders = '') {
    if (!isset($this->server['HTTP_SEC_WEBSOCKET_ORIGIN'])) {
      $this->server['HTTP_SEC_WEBSOCKET_ORIGIN'] = '';
    }

    $this->write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
      . "Upgrade: WebSocket\r\n"
      . "Connection: Upgrade\r\n"
      . "Sec-WebSocket-Origin: " . $this->server['HTTP_ORIGIN'] . "\r\n"
      . "Sec-WebSocket-Location: ws://" . $this->server['HTTP_HOST'] . $this->server['REQUEST_URI'] . "\r\n"
    );
    if (isset($this->server['HTTP_SEC_WEBSOCKET_PROTOCOL'])) {
      $this->write("Sec-WebSocket-Protocol: " . $this->server['HTTP_SEC_WEBSOCKET_PROTOCOL']."\r\n");
    }
    $this->write($extraHeaders."\r\n");
    return true;
  }

  /**
   * Computes key for Sec-WebSocket.
   * @param string Key
   * @return string Result
   */
  protected function _computeKey($key) {
    $spaces = 0;
    $digits = '';

    for ($i = 0, $s = strlen($key); $i < $s; ++$i) {
      $c = substr($key, $i, 1);

      if ($c === "\x20") {
        ++$spaces;
      } elseif (ctype_digit($c)) {
        $digits .= $c;
      }
    }

    if ($spaces > 0) {
      $result = (float)floor($digits / $spaces);
    } else {
      $result = (float)$digits;
    }

    return pack('N', $result);
  }

  /**
   * Sends a frame.
   * @param  string   $data  Frame's data.
   * @param  string   $type  Frame's type. ("STRING" OR "BINARY")
   * @param  callable $cb    Optional. Callback called when the frame is received by client.
   * @callback $cb ( )
   * @return boolean         Success.
   */
  public function send_frame($data, $type = null, $cb = null) {
    if (!$this->handshaked) {
      return false;
    }

    if ($this->closed && $type !== 'CONNCLOSE') {
      return false;
    }

    if ($type === 'CONNCLOSE') {
      if ($cb !== null) {
        $cb($this);
        return true;
      }
    }

    // Binary
    $type = $this->get_frame_type($type);
    if (($type & self::BINARY) === self::BINARY) {
      $n   = strlen($data);
      $len = '';
      $pos = 0;

      char:

      ++$pos;
      $c = $n >> 0 & 0x7F;
      $n >>= 7;

      if ($pos !== 1) {
        $c += 0x80;
      }

      if ($c !== 0x80) {
        $len = chr($c) . $len;
        goto char;
      };

      $this->write(chr(self::BINARY) . $len . $data);
    }
    // String
    else {
      $this->write(chr(self::STRING) . $data . "\xFF");
    }
    if ($cb !== null) {
      $cb();
    }
    return true;
  }

  /**
   * Called when new data received
   * @return void
   */
  public function on_read() {
    while (($buflen = strlen($this->unparsed_data)) >= 2) {
      $hdr       = $this->look(10);
      $frametype = ord(substr($hdr, 0, 1));
      if (($frametype & 0x80) === 0x80) {
        $len = 0;
        $i   = 0;
        do {
          if ($buflen < $i + 1) {
            return;
          }
          $b = ord(substr($hdr, ++$i, 1));
          $n = $b & 0x7F;
          $len *= 0x80;
          $len += $n;
        } while ($b > 0x80);

        if (self::maxAllowedPacket <= $len) {
          // Too big packet
          $this->close();
          return;
        }

        if ($buflen < $len + $i + 1) {
          // not enough data yet
          return;
        }

        $this->drain($i + 1);
        $this->on_frame($this->read($len), $frametype);
      } else {
        if (($p = $this->search("\xFF")) !== false) {
          if (self::maxAllowedPacket <= $p - 1) {
            // Too big packet
            $this->close();
            return;
          }
          $this->drain(1);
          $data = $this->read($p);
          $this->drain(1);
          $this->on_frame($data, 'STRING');
        } else {
          if (self::maxAllowedPacket < $buflen - 1) {
            // Too big packet
            $this->close();
            return;
          }
        }
      }
    }
  }
}


Сразу хочу отметить, что протокол VE не тестировал — понятия не имею кто его использует. Но добросовестно сконвертировал и урезал код из PhpDeamon.

Протокол V13 используют все нормальные браузеры (FireFox, Opera, Chrome, Яндекс). Даже IE его использует (извините, после IE6 — для меня IE никогда не будет «браузером», даже команда разработчик IE заявляли, что это «не браузер, а тонкий клиент»). Протокол V0 использует браузер «Сафари».

Вместо заключения


Спасибо за внимание, используйте на здоровье весь приведенный выше код (разумеется, я советую завернуть его в нормальные объекты, тут всё упрощено исключительно для понимания. Особенно callback на пришедший от пользователя frame советую сделать по-нормальному). Если вы будете использовать этот код, напишите пожалуйста где-то в коде «Спасибо Anlide и PhpDeamon». В итоге сокет сервер, приведенный тут, совместим со всеми современными браузерами. Работает без утечек памяти и годится для использования в высоконагруженных системах.

Обновление:
  • Комментарий автора статьи, на которую я постоянно ссылаюсь в тексте: habrahabr.ru/post/301822/#comment_9634636
  • Метод read_lint() содержит ошибку — что мы читаем данные тела http запроса, хотя должны были читать только заголовки.
  • В основном теле цикла — не корректное использование указателей при переключении протокола.
  • По просьбам трудящихся вот ссылка на gitbub github.com/anlide/websocket тут код исправленный и ещё ping-pong доработанный, осталось ещё причину закрытия сокета фиксировать и заменить select на что-то — и будет замечательная смесь лучших серверных решений по websocket.
Tags:
Hubs:
+9
Comments 70
Comments Comments 70

Articles