Как стать автором
Обновить

Почти настоящая многопоточность средствами php 5

PHP
В очередной раз читал про многопоточность в php точнее полное её отсутствие и всевозможные костыли в виде не блокируемых сокетов. Вот как раз там наткнулся на интересную статью в которой описывался очень простой и эффективный способ распараллеливания потоков. На основе этого материала написал небольшой класс что бы облегчить себе работу в будущем.

Итак код класса threads.php:
<?php

class Threads {
public $phpPath = 'php';

private $lastId = 0;
private $descriptorSpec = array(
0 => array('pipe', 'r'),
1 => array('pipe', 'w')
);
private $handles = array();
private $streams = array();
private $results = array();
private $pipes = array();
private $timeout = 10;

public function newThread($filename, $params=array()) {
if (!file_exists($filename)) {
throw new ThreadsException('FILE_NOT_FOUND');
}

$params = addcslashes(serialize($params), '"');
$command = $this->phpPath.' -q '.$filename.' --params "'.$params.'"';
++$this->lastId;

$this->handles[$this->lastId] = proc_open($command, $this->descriptorSpec, $pipes);
$this->streams[$this->lastId] = $pipes[1];
$this->pipes[$this->lastId] = $pipes;

return $this->lastId;
}

public function iteration() {
if (!count($this->streams)) {
return false;
}
$read = $this->streams;
stream_select($read, $write=null, $except=null, $this->timeout);
/*
Здесь береться только один поток для удобства обработки
на самом деле в массиве $read их зачастую несколько
*/

$stream = current($read);
$id = array_search($stream, $this->streams);
$result = stream_get_contents($this->pipes[$id][1]);
if (feof($stream)) {
fclose($this->pipes[$id][0]);
fclose($this->pipes[$id][1]);
proc_close($this->handles[$id]);
unset($this->handles[$id]);
unset($this->streams[$id]);
unset($this->pipes[$id]);
}
return $result;
}

/*
Статичный метод для получения параметров из
параметров командной строки
*/

public static function getParams() {
foreach ($_SERVER['argv'] as $key => $argv) {
if ($argv == '--params' && isset($_SERVER['argv'][$key + 1])) {
return unserialize($_SERVER['argv'][$key + 1]);
}
}
return false;
}

}

class ThreadsException extends Exception {
}

?>


Теперь для примера создадим test.php:
<?php
$start = microtime(true);

require './threads.php';

$threads = new Threads;

for ($i=0;$i<10;$i++) {
$threads->newThread('./delay.php', array('delay' => rand(1, 5)));
}

while (false !== ($result = $threads->iteration())) {
if (!empty($result)) {
echo $result."\r\n";
}
}

$end = microtime(true);
echo "Execution time ".round($end - $start, 2)."\r\n";

?>


И delay.php который он вызывает:
<?php

require './threads.php';

if ($params = Threads::getParams()) {
sleep($params['delay']);
echo 'Wait for '.$params['delay'].' s.';
}

?>


В результате выполнения test.php получаем следующее:
Microsoft Windows [Version 6.1.7100]
Copyright (c) 2009 Microsoft Corporation. All rights reserved.

Z:\home\labs\www\threads>php test.php
Wait for 1 s.
Wait for 5 s.
Wait for 4 s.
Wait for 5 s.
Wait for 5 s.
Wait for 5 s.
Wait for 3 s.
Wait for 3 s.
Wait for 2 s.
Wait for 4 s.
Execution time 5.58

Z:\home\labs\www\threads>
Теги:phpмногопоточностьскорость
Хабы: PHP
Всего голосов 73: ↑52 и ↓21+31
Просмотры32K

Похожие публикации

Лучшие публикации за сутки