Pull to refresh

Почти настоящая многопоточность средствами php 5

Reading time 5 min
Views 32K
В очередной раз читал про многопоточность в php точнее полное её отсутствие и всевозможные костыли в виде не блокируемых сокетов. Вот как раз там наткнулся на интересную статью в которой описывался очень простой и эффективный способ распараллеливания потоков. На основе этого материала написал небольшой класс что бы облегчить себе работу в будущем.

Итак код класса threads.php:
<?php

class Threads {
    public $phpPath = 'php';
    
    private $lastId = 0;
    private $descriptorSpec = array(
        0 => array('pipe''r'),
        1 => array('pipe''w')
    );
    private $handles = array();
    private $streams = array();
    private $results = array();
    private $pipes = array();
    private $timeout = 10;
    
    public function newThread($filename$params=array()) {
        if (!file_exists($filename)) {
            throw new ThreadsException('FILE_NOT_FOUND');
        }
        
        $params = addcslashes(serialize($params), '"');
        $command = $this->phpPath.' -q '.$filename.' --params "'.$params.'"';
        ++$this->lastId;
        
        $this->handles[$this->lastId= proc_open($command$this->descriptorSpec$pipes);
        $this->streams[$this->lastId= $pipes[1];
        $this->pipes[$this->lastId= $pipes;
        
        return $this->lastId;
    }
    
    public function iteration() {
        if (!count($this->streams)) {
            return false;
        }
        $read = $this->streams;
        stream_select($read$write=null$except=null$this->timeout);
        /* 
            Здесь береться только один поток для удобства обработки 
            на самом деле в массиве $read их зачастую несколько
        */

        $stream = current($read);
        $id = array_search($stream$this->streams);
        $result = stream_get_contents($this->pipes[$id][1]);
        if (feof($stream)) {
            fclose($this->pipes[$id][0]);
            fclose($this->pipes[$id][1]);
            proc_close($this->handles[$id]);
            unset($this->handles[$id]);
            unset($this->streams[$id]);
            unset($this->pipes[$id]);
        }
        return $result;
    }
    
    /*
        Статичный метод для получения параметров из 
        параметров командной строки
    */

    public static function getParams() {
        foreach ($_SERVER['argv'as $key => $argv) {
            if ($argv == '--params' && isset($_SERVER['argv'][$key + 1])) {
                return unserialize($_SERVER['argv'][$key + 1]);
            }
        }
        return false;
    }
    
}

class ThreadsException extends Exception {
}

?>


Теперь для примера создадим test.php:
<?php
$start = microtime(true);

require './threads.php';

$threads = new Threads;

for ($i=0;$i<10;$i++) {
    $threads->newThread('./delay.php'array('delay' => rand(15)));
}

while (false !== ($result = $threads->iteration())) {
    if (!empty($result)) {
        echo $result."\r\n";
    }
}

$end = microtime(true);
echo "Execution time ".round($end - $start2)."\r\n";

?>


И delay.php который он вызывает:
<?php

require './threads.php';

if ($params = Threads::getParams()) {
    sleep($params['delay']);
    echo 'Wait for '.$params['delay'].' s.';
}

?>


В результате выполнения test.php получаем следующее:
Microsoft Windows [Version 6.1.7100]
Copyright (c) 2009 Microsoft Corporation. All rights reserved.

Z:\home\labs\www\threads>php test.php
Wait for 1 s.
Wait for 5 s.
Wait for 4 s.
Wait for 5 s.
Wait for 5 s.
Wait for 5 s.
Wait for 3 s.
Wait for 3 s.
Wait for 2 s.
Wait for 4 s.
Execution time 5.58

Z:\home\labs\www\threads>
Tags:
Hubs:
+31
Comments 59
Comments Comments 59

Articles