Как стать автором
Обновить

Эмуляция многопоточности в PHP

PHP *
Суть многопоточности (для приложения) состоит в том, что процесс может состоять из нескольких (однотипных) потоков, выполняющихся «параллельно», то есть без упорядочивания по времени — выполнился один, пошел следующий. Использование многопоточности позволяет ускорить выполнение задачи и/или снизить нагрузку, таким образом, улучшая быстродействие самого приложения.

PHP относится к языкам, в которых поддержка многопоточности отсутствует. Но, есть немало задач, в которых она была бы очень полезна. Как правило, это задачи, в которых одно действие нужно выполнить много раз, но с различными параметрами, причем само действие настолько ресурсоемко, а количество итераций настолько велико, что обычный цикл использовать невозможно.

Я рассмотрю простой пример, как можно достичь эмуляции многопоточности в PHP.

Допустим, нам нужно осуществить почтовую рассылку на очень большое количество адресов. Адреса хранятся в базе. Так как адресов много, то такой вариант нам не подходит, повесим сервер:

$res mysql_query('SELECT `email` FROM `user`');

while (
$row mysql_fetch_assoc($res))
    
mail($row['email'], $theme$text);


Если мы хотим использовать для этой задачи многопоточность, то неплохо бы выглядел следующий вариант:

$res mysql_query('SELECT `email` FROM `user`');

$data = array();

while (
$row mysql_fetch_assoc($res))
    
$data[] = $row;
    
$multithreading = new MultiThreading();

// mailer.php — скрипт, который отправляет письмо по адресу, переданному ему методом GET,
// то есть mailer.php?email=somebody@example.com
$multithreading->setScriptName('mailer.php');

$multithreading->setParams($data);

$multithreading->execute();


Все просто и удобно. Осталось написать соответствующий класс, что я и сделал. Вот он, со всеми пояснениями в комментариях.

class MultiThreading
{
    
/**
     * Имя сервера
     *
     * @var string
     * @access private
     */
    
private $server;
    
    
/**
     * Максимальное количество потоков
     *
     * @var int
     * @access private
     */
    
private $maxthreads;
    
    
/**
     * Имя скрипта, который выполняет нужную нам задачу
     *
     * @var string
     * @access private
     */
    
private $scriptname;
    
    
/**
     * Параметры, которые мы будем передавать скрипту
     *
     * @var array
     * @access private
     */
    
private $params = array();
    
    
/**
     * Массив, в котором хранятся потоки
     *
     * @var array
     * @access private
     */
    
private $threads = array();
    
    
/**
     * Массив, в котором хранятся результаты
     *
     * @var array
     * @access private
     */
    
private $results = array();
    
    
/**
     * Конструктор класса. В нем мы указываем максимальное количество потоков и имя сервера. Оба аргумента необязательны.
     *
     * @param int $maxthreads максимальное количество потоков, по умолчанию 10
     * @param string $server имя сервера, по умолчанию имя сервера, на котором запущено приложение
     * @access public
     */
    
public function __construct($maxthreads 10$server '')
    {
        if (
$server)
            
$this->server $server;
        else
            
$this->server $_SERVER['SERVER_NAME'];
        
        
$this->maxthreads $maxthreads;
    }
    
    
/**
     * Указываем имя скрипта, который выполняет нужную нам задачу
     *
     * @param string $scriptname имя скрипта, включая путь к нему
     * @access public
     */
    
public function setScriptName($scriptname)
    {
        if (!
$fp fopen('http://'.$this->server.'/'.$scriptname'r'))
            throw new 
Exception('Cant open script file');
        
        
fclose($fp);
        
        
$this->scriptname $scriptname;
    }
    
    
/**
     * Задаем параметры, которые мы будем передавать скрипту
     *
     * @param array $params массив параметров
     * @access public
     */
    
public function setParams($params = array())
    {
        
$this->params $params;
    }
    
    
/**
     * Выполняем задачу, комментарии в коде
     *
     * @access public
     */
    
public function execute()
    {
        
// Запускаем механизм, и он работает, пока не выполнятся все потоки
        
do {
            
// Если не превысили лимит потоков
            
if (count($this->threads) < $maxthreads) {
                
// Если удается получить следующий набор параметров
                
if ($item current($this->params)) {
                
                    
// Формируем запрос методом GET
                    
                    
$query_string '';
                
                    foreach (
$item as $key=>$value)
                        
$query_string .= '&'.urlencode($key).'='.urlencode($value);
                    
                    
$query "GET http://".$this->server."/".$this->scriptname."?".$query_string." HTTP/1.0\r\n";
                    
                    
// Открыватем соединение
                    
                    
if (!$fsock fsockopen($this->server80))
                        throw new 
Exception('Cant open socket connection');
                
                    
fputs($fsock$query);
                    
fputs($fsock"Host: $server\r\n");
                    
fputs($fsock"\r\n");
                
                    
stream_set_blocking($fsockO);
                    
stream_set_timeout($fsock3600);
                    
                    
// Записываем поток
                
                    
$this->threads[] = $fsock;
                    
                    
// Переходим к следующему элементу
                
                    
next($this->params);
                }
            }
            
            
// Перебираем потоки
            
foreach ($this->threads as $key=>$value) {
                
// Если поток отработал, закрываем и удаляем
                
if (feof($value)) {
                    
fclose($value);
                    unset(
$this->threads[$key]);
                } else {
                    
// Иначе считываем результаты
                    
$this->results[] = fgets($value);
                }
            }
            
            
// Можно поставить задержку, чтобы не повесить сервер
            
sleep(1);
            
        
// ... пока не выполнятся все потоки    
        
} while (count($this->threads) > O);
    
        return 
$this->results;
    }
}



Также можно этот класс скачать, чтобы не копипастить :-).

UPD: господа, хочу напомнить, все же это скорее развернутый пример, нежели идеальное готовое решение.

P.S. Парсер почему-то сьедает конструкции типа «$var > 0», поэтому в двух местах (stream_set_blocking($fsock, 0); и while (count($this->threads) > 0);) я заменил цифру 0 на букву O. В классе для скачивания все нормально.

Оригинал статьи.
Теги:
Хабы:
Всего голосов 54: ↑40 и ↓14 +26
Просмотры 29K
Комментарии Комментарии 76