Как стать автором
Обновить

распараллеливаем выполнение задач с помощью stream_select()

Время на прочтение7 мин
Количество просмотров5.2K
Не так уж много кто знает о том, что некоторые задачи в PHP можно заставить выполняться параллельно — и для не этого не нужно прибегать к форкам. В PHP5 есть stream-functions, и среди них — stream_select().

Прочитав статью Cameron Laird (http://www.ibm.com/developerworks/opensource/library/os-php-multitask/index.html?S_TACT=105AGX44&S_CMP=EDU), каждый, кто еще этого не сделал, сможет научиться этой технике, я же в этом топике предлагаю вашему вниманию простой небольшой класс Parastreams, который собсно займется распараллеливанием — а уж что делать с полученными из потоков данными — вы решите сами, указав обработчики данных.



Область применения технологии:
Нужно получить некие данные по сети с нескольких сокетов. Используя stream_select(), вы получите данные от всех сокетов за время, равное времени получения данных с самого медленного из них (при традиционном подходе общее время будет равно сумме времен получения данных от каждого сокета).
Допустим, вы используете поиск с пом. Sphinx. С помощью stream_select() можно заставить несколько запросов к поисковому демону выполняться параллельно (конечно, придется поднапрячься и расковырять sphinxapi, но ничего сверхсложного там нет). Это может пригодиться, когда поиск приводит к двум запросам к поисковому демону (допустим, ищем в постах и в комментариях): два эти запроса к двум, соответственно, индексам, будут выполняться параллельно — то есть получаем оптимизацию и ускорение поиска.

А вот и код класса:

<?php

/**
* Parastreams PHP class:
* a simple tool for performing multiple tasks with PHP - simultaneously (in parallel).
*
* example of usage:
* $ps = new Parastreams();
*
* function parastreams_callback($data) {
*     echo $data."\n";
* }
*
* $s = stream_socket_client("localhost:80", $errno,
*    $errstr, 10,
*    STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
* fwrite($s, "GET /sleep.php?delay=1 HTTP/1.0\r\nHost: localhost\r\n\r\n");
* $ps->add($s, 'parastreams_callback');
* ... // repeat the above 5 lines as many times as you wish to, adding new streams to $ps.
* $ps->run();// process the streams
*
* Author: Victor Bolshov ( crocodile2u ( the at symbol here ) yandex.ru )
*
* License: use this script without any retrictions.
*
* Based on code by Cameron Laird, you may find his code here:
* www.ibm.com/developerworks/opensource/library/os-php-multitask/index.html?S_TACT=105AGX44&S_CMP=EDU
*
* PHP version used: PHP 5.3.0alpha1 (should be compatible with older versions of PHP5)
*/

class Parastreams {
  /**
   * streams served by this instance
   * @var resource[]
   */
  private $streams = array();
  /**
   * stream events listeners
   * @var array
   */
  private $listeners = array();
  /**
   * @var int
   */
  private $timeout = 10;
  /**
   * Constructor
   * @param array $arg when specified, add() is called and $arg is passed to add()
   * @see add()
   */
  function __construct($arg = null)
  {
    if ($arg)
    {
      $this->add($arg);
    }
  }
  /**
   * add new stream(s)
   * @param array | resource $arg either a stream resource or an array like this:
   * array(
   * array(stream1, listener1),
   * array(stream2, listener2),..
   *)
   * where streamN is a stream resource created with stream_socket_client(),
   * and listenerN is a Closure object which is called once the stream becomes readable,
   * with the only argument: string $data (the data read from the stream)
   * @param callable $arg2 the listener to stream; matters only in case when the first arg is not an array
   * @return void
   * @throws ParastreamsException
   */
  function add($arg1, $arg2 = null)
  {
    if (is_array($arg1))
    {
      foreach ($arg1 as $offset => $s)
      {
        if (! is_array($s))
        {
          throw new ParastreamsException("Illegal input at offset " . $offset . " (not an array)");
        } elseif (count($s = array_values($s)) < 2) {
          throw new ParastreamsException("Illegal input at offset " . $offset . " (length is less then 2)");
        } elseif (! is_resource($s[0])) {
          throw new ParastreamsException("Illegal input at offset " . $offset . " (not a stream resource)");
        } elseif (! is_callable($s[1])) {
          throw new ParastreamsException("Illegal input at offset " . $offset . " (not a callable)");
        }
        
        $this->addOne($s[0], $s[1]);
      }
    } elseif (is_resource($arg1)) {
      if (! is_callable($arg2))
      {
        throw new ParastreamsException("Argument 2 is expected to be a callable, " . gettype($arg2) . " given");
      }
      $this->addOne($arg1, $arg2);
    } else {
      throw new ParastreamsException("Argument 1 is expected to be a resource or an array, " . gettype($arg1) . " given");
    }
  }
  /**
   * Start listening to stream events
   * @return void
   * @throws ParastreamsException
   */
  function run()
  {
    while (count($this->streams))
    {
      $events = $this->streams;
      if (false === stream_select($events, $w = null, $e = null, $this->timeout))
      {
        throw new ParastreamsException("stream_select() failed!");
      } elseif (count($events)) {
        $this->processStreamEvents($events);
      } else {
        throw new ParastreamsException("Time out!");
      }
    }
  }
  
  /* Starting private methods */
  
  private function processStreamEvents($events)
  {
    foreach ($events as $fp) {
      $id = array_search($fp, $this->streams);
      
      $this->invokeListener($fp);
      
      fclose($fp);
      unset($this->streams[$id]);
    }
  }
  private function invokeListener($fp)
  {
    foreach ($this->listeners as $index => $spec) {
      if ($spec[0] == $fp)
      {
        $data = "";
        while (! feof($fp))
        {
          $data .= fread($fp, 1024);
        }
        call_user_func($spec[1], $data);
        unset($this->listeners[$index]);
        return ;
      }
    }
  }
  private function addOne($stream, $listener)
  {
    $this->streams[] = $stream;
    $this->listeners[] = array($stream, $listener);
  }
}

class ParastreamsException extends RuntimeException {}


* This source code was highlighted with Source Code Highlighter.


Пример использования (есть в комментариях, но тем не менее):

test.php:
<?php

require_once 'Parastreams.php';

function parastreams_callback($data) {
  echo $data."\n";
};

$streams = array();
for ($i = 1; $i <= 3; ++$i) {
  $s = stream_socket_client("localhost:80", $errno,
    $errstr, 10,
    STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
  fwrite($s, "GET /sleep.php?delay=" . $i . " HTTP/1.0\r\nHost: localhost\r\n\r\n");
  $streams[$i] = array($s, 'parastreams_callback');
}

$ps = new Parastreams($streams);
$ps->run();


* This source code was highlighted with Source Code Highlighter.


В примере используется sleep.php, для полноты картины вот он:

<?php

$delay = filter_input(INPUT_GET, 'delay', FILTER_VALIDATE_INT);
if ($delay <= 0) {
  $delay = 1;
}

sleep($delay);

echo "was sleeping for $delay seconds\n";


* This source code was highlighted with Source Code Highlighter.
Теги:
Хабы:
Всего голосов 47: ↑44 и ↓3+41
Комментарии16

Публикации

Истории

Работа

PHP программист
102 вакансии

Ближайшие события

7 – 8 ноября
Конференция byteoilgas_conf 2024
МоскваОнлайн
7 – 8 ноября
Конференция «Матемаркетинг»
МоскваОнлайн
15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань