распараллеливаем выполнение задач с помощью stream_select()

    Не так уж много кто знает о том, что некоторые задачи в PHP можно заставить выполняться параллельно — и для не этого не нужно прибегать к форкам. В PHP5 есть stream-functions, и среди них — stream_select().

    Прочитав статью Cameron Laird (http://www.ibm.com/developerworks/opensource/library/os-php-multitask/index.html?S_TACT=105AGX44&S_CMP=EDU), каждый, кто еще этого не сделал, сможет научиться этой технике, я же в этом топике предлагаю вашему вниманию простой небольшой класс Parastreams, который собсно займется распараллеливанием — а уж что делать с полученными из потоков данными — вы решите сами, указав обработчики данных.



    Область применения технологии:
    Нужно получить некие данные по сети с нескольких сокетов. Используя stream_select(), вы получите данные от всех сокетов за время, равное времени получения данных с самого медленного из них (при традиционном подходе общее время будет равно сумме времен получения данных от каждого сокета).
    Допустим, вы используете поиск с пом. Sphinx. С помощью stream_select() можно заставить несколько запросов к поисковому демону выполняться параллельно (конечно, придется поднапрячься и расковырять sphinxapi, но ничего сверхсложного там нет). Это может пригодиться, когда поиск приводит к двум запросам к поисковому демону (допустим, ищем в постах и в комментариях): два эти запроса к двум, соответственно, индексам, будут выполняться параллельно — то есть получаем оптимизацию и ускорение поиска.

    А вот и код класса:

    <?php

    /**
    * Parastreams PHP class:
    * a simple tool for performing multiple tasks with PHP - simultaneously (in parallel).
    *
    * example of usage:
    * $ps = new Parastreams();
    *
    * function parastreams_callback($data) {
    *     echo $data."\n";
    * }
    *
    * $s = stream_socket_client("localhost:80", $errno,
    *    $errstr, 10,
    *    STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
    * fwrite($s, "GET /sleep.php?delay=1 HTTP/1.0\r\nHost: localhost\r\n\r\n");
    * $ps->add($s, 'parastreams_callback');
    * ... // repeat the above 5 lines as many times as you wish to, adding new streams to $ps.
    * $ps->run();// process the streams
    *
    * Author: Victor Bolshov ( crocodile2u ( the at symbol here ) yandex.ru )
    *
    * License: use this script without any retrictions.
    *
    * Based on code by Cameron Laird, you may find his code here:
    * www.ibm.com/developerworks/opensource/library/os-php-multitask/index.html?S_TACT=105AGX44&S_CMP=EDU
    *
    * PHP version used: PHP 5.3.0alpha1 (should be compatible with older versions of PHP5)
    */

    class Parastreams {
      /**
       * streams served by this instance
       * @var resource[]
       */
      private $streams = array();
      /**
       * stream events listeners
       * @var array
       */
      private $listeners = array();
      /**
       * @var int
       */
      private $timeout = 10;
      /**
       * Constructor
       * @param array $arg when specified, add() is called and $arg is passed to add()
       * @see add()
       */
      function __construct($arg = null)
      {
        if ($arg)
        {
          $this->add($arg);
        }
      }
      /**
       * add new stream(s)
       * @param array | resource $arg either a stream resource or an array like this:
       * array(
       * array(stream1, listener1),
       * array(stream2, listener2),..
       *)
       * where streamN is a stream resource created with stream_socket_client(),
       * and listenerN is a Closure object which is called once the stream becomes readable,
       * with the only argument: string $data (the data read from the stream)
       * @param callable $arg2 the listener to stream; matters only in case when the first arg is not an array
       * @return void
       * @throws ParastreamsException
       */
      function add($arg1, $arg2 = null)
      {
        if (is_array($arg1))
        {
          foreach ($arg1 as $offset => $s)
          {
            if (! is_array($s))
            {
              throw new ParastreamsException("Illegal input at offset " . $offset . " (not an array)");
            } elseif (count($s = array_values($s)) < 2) {
              throw new ParastreamsException("Illegal input at offset " . $offset . " (length is less then 2)");
            } elseif (! is_resource($s[0])) {
              throw new ParastreamsException("Illegal input at offset " . $offset . " (not a stream resource)");
            } elseif (! is_callable($s[1])) {
              throw new ParastreamsException("Illegal input at offset " . $offset . " (not a callable)");
            }
            
            $this->addOne($s[0], $s[1]);
          }
        } elseif (is_resource($arg1)) {
          if (! is_callable($arg2))
          {
            throw new ParastreamsException("Argument 2 is expected to be a callable, " . gettype($arg2) . " given");
          }
          $this->addOne($arg1, $arg2);
        } else {
          throw new ParastreamsException("Argument 1 is expected to be a resource or an array, " . gettype($arg1) . " given");
        }
      }
      /**
       * Start listening to stream events
       * @return void
       * @throws ParastreamsException
       */
      function run()
      {
        while (count($this->streams))
        {
          $events = $this->streams;
          if (false === stream_select($events, $w = null, $e = null, $this->timeout))
          {
            throw new ParastreamsException("stream_select() failed!");
          } elseif (count($events)) {
            $this->processStreamEvents($events);
          } else {
            throw new ParastreamsException("Time out!");
          }
        }
      }
      
      /* Starting private methods */
      
      private function processStreamEvents($events)
      {
        foreach ($events as $fp) {
          $id = array_search($fp, $this->streams);
          
          $this->invokeListener($fp);
          
          fclose($fp);
          unset($this->streams[$id]);
        }
      }
      private function invokeListener($fp)
      {
        foreach ($this->listeners as $index => $spec) {
          if ($spec[0] == $fp)
          {
            $data = "";
            while (! feof($fp))
            {
              $data .= fread($fp, 1024);
            }
            call_user_func($spec[1], $data);
            unset($this->listeners[$index]);
            return ;
          }
        }
      }
      private function addOne($stream, $listener)
      {
        $this->streams[] = $stream;
        $this->listeners[] = array($stream, $listener);
      }
    }

    class ParastreamsException extends RuntimeException {}


    * This source code was highlighted with Source Code Highlighter.


    Пример использования (есть в комментариях, но тем не менее):

    test.php:
    <?php

    require_once 'Parastreams.php';

    function parastreams_callback($data) {
      echo $data."\n";
    };

    $streams = array();
    for ($i = 1; $i <= 3; ++$i) {
      $s = stream_socket_client("localhost:80", $errno,
        $errstr, 10,
        STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
      fwrite($s, "GET /sleep.php?delay=" . $i . " HTTP/1.0\r\nHost: localhost\r\n\r\n");
      $streams[$i] = array($s, 'parastreams_callback');
    }

    $ps = new Parastreams($streams);
    $ps->run();


    * This source code was highlighted with Source Code Highlighter.


    В примере используется sleep.php, для полноты картины вот он:

    <?php

    $delay = filter_input(INPUT_GET, 'delay', FILTER_VALIDATE_INT);
    if ($delay <= 0) {
      $delay = 1;
    }

    sleep($delay);

    echo "was sleeping for $delay seconds\n";


    * This source code was highlighted with Source Code Highlighter.
    AdBlock has stolen the banner, but banners are not teeth — they will be back

    More
    Ads

    Comments 16

      +1
      Спасибо! Крайне интерестно для использования роботостроении.
        –2
        Какое PHP в роботостроении? Робот с web-сервером внутри? :)
          0
          Речь про роботов поисковых машин, например.
            0
            По-моему такое на PHP неоправданно писать, хотя может быть я и ошибаюсь.
          0
          Тоже делаю роботов. Статья действительно ценная. Спасибо!
          +2
          Спасибо, почаще бы выходили такие статьи. Распараллеливание вычислений, одно из важных в разработке, так как web любит скорость…
          доступ к web информации из БД — это как formula-1 :)
            0
            Не знал. Спасибо за весьма полезную информацию!
              –1
              вот за filter_input вам благодарен… правда теперь я понял, что фреймворки вообще нах не нужны…
                0
                в каком месте это стало понятно?
                +2
                такие штуки прикольно делать с большими объёмами данных, например, для баз данных. чтобы не ждать пока оно запишется/прочитается, а заниматься подкотовкой ещё чего-нибудь.

                но в этом всём есть свой большой минус. если, допустим, использовать по 5 дополнительных процессов на каждый скрипт, то при нормальной нагрузке можно забить всю очередь апача. тогда выйдет проигрышь, нежели выполнять всё в 1 потоке.
                  0
                  Поправьте меня, если я ошибаюсь, но в данном случае никаких дополнительных процессов не запускается. stream_select() просто «опрашивает» сокеты на предмет доступности и делает это до тех пор, пока какие-то из них не станут доступны или же не наступит тайм-аут. Все происходит в одном процессе.
                    0
                    в зависимости от настроек сервера. Допустим, сервер будет апач, то в зависимости от версии будут запускаться либо новые процессы (1. х ) либо новые потоки (2. х) при отсуствии свободных. А свободные с большим расходом быстро забьются. Для ethernet сайтов это хорошее решиние. Для интернет сайтов, расчитаных на посещаемость съест быстро все свободные ресурсы. Чревато получением #500.

                    Сие можно извратить и использовать отдельный соседний (входящий в одну подсеть) сервер для приёма данных в базу данных. Тогда выигрышь ощутим. Если в подсеть отключить от внешних «раздражителей», то для систем с большой обработкой данных (а не генерацией страниц, как это обычно бывает) получим хороший выигрышь.

                    Только тогда PHP уже не сильно подходит как язык написания таких систем.
                      0
                      Мне кажется, вы слишком большое внимание уделили приведенному мной примеру, и слишком маленькое — собственно технологии. Совсем необязательно, что открытые сокеты будут прослушиваться апачем или другим веб-сервером, совсем необязательно использовать при этом протокол HTTP. Это могут быть, например, сокеты, которые слушает поисковый демон «Сфинкс» или еще что угодно.

                      Важно то, что мы сами «слушаем» события на этих сокетах и, как только те или иные из них становятся доступны — принимаем с них данные и эти данные обрабатываем, тогда как в традиционном подходе пришлось бы открыть сокет, дождаться, пока станет возможно считать оттуда что-то, считать, закрыть сокет и перейти к следующему, чтобы повторить все сначала.
                        0
                        возможно, да, я однобоко смотрел исключительно отталкиваясь от примера.
                        маленькое замечание.
                        sockets != streams
                        хоть и похожа обработка. сокеты очень быстрые. потоки — по всякому бывает.
                          0
                          Согласен, уже и сам посмотрел на свои комментарии и понял, что использую то один термин, то другой, в то время как на самом деле речь только о потоках.
                • UFO just landed and posted this here

                  Only users with full accounts can post comments. Log in, please.