Представим, что у нас есть некоторый набор задач, допускающих возможность параллельного выполнения. К примеру, нам нужно организовать RSS-агрегатор, обновляющий через заданный промежуток времени все свои ленты. Понятно, что основное и при этом вполне ощутимое время будет уходить на загрузку данных с удалённого источника. Учитывая это, организация такого импорта путём последовательной загрузки лент лишена смысла, так в случае сколь-либо большого количества лент, импорт не будет укладываться в отведённые ему сроки.
Варианта решения проблема тут можеть быть два. Первый — реализовать параллельную загрузку лент с помощью CURL'a. Наприме так:
Этот вариант вариант отчасти решит проблему с простоем при ожидании загрузки ленты, однако дальнейший разбор ленты придётся осуществлять в последовательном режиме. Кроме того, он не очень удобен, если в Вашей предметной моделе лента рассматривается как объект некоторого класса.
Второй вариант заключается в создании некоторого множества (пула) дочерних процессов — по каждому на ленту. Сделать это можно, к примеру, с помощью семейства фукций proc_*. Так же разумно было бы ограничить множество одновременно запущенных процессов (размер пула) некоторым числом, чтобы контролировать нагрузку сервера (в принципе это утверждение справедливо и для первого варианта). Для этого придётся сэмулировать диспетчер, который будет следить за состоянием пула и добавлять в него новые процессы по мере завершения работы процессов в пуле.
Ниже приведён самодокументированный пример реализации пула для параллельного выполнения задачи импорта RSS-лент:
Результатом работы этого скрипта будет следующий лог:
Метод опробован в боевых условиях и на данный момент никаких нариканий не вызывал.
Варианта решения проблема тут можеть быть два. Первый — реализовать параллельную загрузку лент с помощью CURL'a. Наприме так:
// Подготовка данных для асинхронного запроса
$rMultiHandler = curl_multi_init();
$aResources = array();
foreach( $aFeedUrls as $sFeedUrl ) {
$rResource = curl_init();
curl_setopt($rResource, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($rResource, CURLOPT_URL, $sFeedUrl );
curl_setopt($rResource, CURLOPT_FOLLOWLOCATION, true);
curl_setopt($rResource, CURLOPT_TIMEOUT, 60);
curl_multi_add_handle( $multi_handler, $rResource );
$aResources[] = array(
'url' => $sFeedUrl,
'client' => $rResource
);
}
// Асинхронный запрос данных через CURL
$iRunningProcesses = null;
do {
usleep( 1000000 );
curl_multi_exec( $rMultiHandler, $iRunningProcesses );
} while( $iRunningProcesses > 0 );
// Обработка полученной информации
foreach( $aResources as $aResource ) {
$aHeaders = curl_getinfo( $aResource['client'] );
$sBody = curl_multi_getcontent( $aResource['client'] );
}
Этот вариант вариант отчасти решит проблему с простоем при ожидании загрузки ленты, однако дальнейший разбор ленты придётся осуществлять в последовательном режиме. Кроме того, он не очень удобен, если в Вашей предметной моделе лента рассматривается как объект некоторого класса.
Второй вариант заключается в создании некоторого множества (пула) дочерних процессов — по каждому на ленту. Сделать это можно, к примеру, с помощью семейства фукций proc_*. Так же разумно было бы ограничить множество одновременно запущенных процессов (размер пула) некоторым числом, чтобы контролировать нагрузку сервера (в принципе это утверждение справедливо и для первого варианта). Для этого придётся сэмулировать диспетчер, который будет следить за состоянием пула и добавлять в него новые процессы по мере завершения работы процессов в пуле.
Ниже приведён самодокументированный пример реализации пула для параллельного выполнения задачи импорта RSS-лент:
/**
* Импорт лент
*/
class Import {
/**
* Размера пула процессов импорта
* @var int
*/
const POOL_SIZE = 10;
/**
* Максимальное время выполнения скрипта импорта заданной ленты
* @var int
*/
const POOL_PROC_EXEC_TIME = 180;
/**
* Запуск пула процессов импорта лент
*/
public function startPool() {
file_put_contents('import.log', "[*] Запуск процесса импорта " .
PHP_EOL, FILE_APPEND );
// Инициализация счётчиков
$iSuccess = 0;
$iFailure = 0;
$iUpdated = 0;
// Формирование списка идентификаторов лент, подлежащих импорту
$aFeedId = array( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
13, 14, 15 );
// Инициализация пула
$aPool = array();
for( $iIter = 0; $iIter < self::POOL_SIZE && !empty( $aFeedId );
$iIter++ ) {
$iFeedId = array_shift( $aFeedId );
$this->startProcess( $aPool, $iFeedId );
}
// Обработка процессов импорта в рамках пула
while( !empty( $aPool ) ) {
// Ожидание работы процессов из пула (1 секунда)
usleep(1000000);
// Обработка завершённых процессов
foreach( $aPool as $iKey => &$aProcess ) {
// Получение информации о процессе
$aProcStatus = proc_get_status( $aProcess['handler'] );
// Процесс завершён
if( false === $aProcStatus['running'] ) {
// Получение данных от процесса
$iResponse = fgets( $aProcess['pipes'][1] );
// Окончание работы с процессом
fclose( $aProcess['pipes'][1] );
fclose( $aProcess['pipes'][2] );
proc_close( $aProcess['handler'] );
// Процесс отработал успешно
// www.php.net/manual/en/function.proc-get-status.php#92145
if( 0 === $aProcStatus['exitcode']
&& is_numeric( $iResponse ) ) {
$iSuccess++;
$iUpdated += $iResponse;
// В ходе работы процесса возникла ошибка
} else
$iFailure++;
// Замена текущего процесса новым
unset( $aPool[ $iKey ] );
if( !empty( $aFeedId ) ) {
$iFeedId = array_shift( $aFeedId );
$bIsLaunched = $this->startProcess( $aPool,
$iFeedId );
if( !$bIsLaunched )
$iFailure++;
}
// Процесс работает
} else {
// Процесс завис
if( time() - $aProcess['iTimeStart'] >
self::POOL_PROC_EXEC_TIME ) {
file_put_contents('import.log', "[!] Процесс импорта ".
" ленты {$aProcess['iFeedId']} завис и будет " .
" завершён принудительно" . PHP_EOL, FILE_APPEND );
$iSingnalCode = 15;
proc_terminate( $aProcess['handler'], $iSingnalCode );
}
}
}
unset( $aProcess );
}
file_put_contents('import.log', "[*] Процесс импорта завершён: " .
" успешно {$iSuccess}, неуспешно {$iFailure}, " .
" добавлено {$iUpdated}" . PHP_EOL, FILE_APPEND );
}
/**
* Запуск процесса импорта ленты в рамках пула
* @param array $aPool Пул процессов
* @param int $iFeedId Идентификатор ленты
*/
public function startProcess( array &$aPool, $iFeedId ) {
// Инициализация данных для запуска процесса
// www.php.net/manual/en/function.proc-get-status.php#93382
$sCmd = "exec php -f " . __FILE__ . " {$iFeedId}";
$aDescriptors = array(
1 => array("pipe", "w"),
2 => array("pipe", "w")
);
$aPipes = array();
// Запуск процесса импорта ленты
$bSuccess = true;
$rProcess = proc_open( $sCmd, $aDescriptors, $aPipes );
if( is_resource( $rProcess ) ) {
$aPool[] = array(
'handler' => $rProcess,
'pipes' => $aPipes,
'iFeedId' => $iFeedId,
'iTimeStart'=> time()
);
} else {
$bSuccess = false;
file_put_contents('import.log', "[!] Не удалось запустить " .
" процесс импорта ленты {$iFeedId}", FILE_APPEND );
}
return $bSuccess;
}
/**
* Импорт ленты
* @param $iFeedId Идентификатор ленты
* @return int Количество обновлённых элементов в ленте
*/
public function doImport( $iFeedId ) {
file_put_contents('import.log', "[+] Импорт ленты {$iFeedId}" .
PHP_EOL, FILE_APPEND);
// Имитация импорта
$iExecTime = rand( 1, 10 );
usleep( $iExecTime * 1000000 );
$iUpdated = rand( 0,10 );
file_put_contents('import.log', "[-] Импорт ленты {$iFeedId}" .
" за {$iExecTime} секунд" . PHP_EOL, FILE_APPEND);
// Отправка количества добавленных элементов родительскому процессу
echo $iUpdated;
return $iUpdated;
}
}
/**
* Контроллер
*/
$oImport = new Import();
// Запуск пула
if( 1 === $argc ) {
$oImport->startPool();
// Запуск импорта конкретной ленты
} else {
$iFeedId = $argv[1];
$oImport->doImport( $iFeedId );
}
Результатом работы этого скрипта будет следующий лог:
[*] Запуск процесса импорта
[+] Импорт ленты 1
[+] Импорт ленты 2
[+] Импорт ленты 3
[+] Импорт ленты 4
[+] Импорт ленты 5
[+] Импорт ленты 8
[+] Импорт ленты 6
[+] Импорт ленты 7
[+] Импорт ленты 9
[+] Импорт ленты 10
[-] Импорт ленты 7 за 1 секунд
[+] Импорт ленты 11
[-] Импорт ленты 1 за 5 секунд
[+] Импорт ленты 12
[-] Импорт ленты 10 за 5 секунд
[-] Импорт ленты 2 за 6 секунд
[-] Импорт ленты 12 за 1 секунд
[+] Импорт ленты 13
[+] Импорт ленты 14
[-] Импорт ленты 3 за 7 секунд
[-] Импорт ленты 6 за 7 секунд
[+] Импорт ленты 15
[-] Импорт ленты 9 за 7 секунд
[-] Импорт ленты 14 за 1 секунд
[-] Импорт ленты 11 за 6 секунд
[-] Импорт ленты 4 за 9 секунд
[-] Импорт ленты 5 за 10 секунд
[-] Импорт ленты 8 за 10 секунд
[-] Импорт ленты 13 за 7 секунд
[-] Импорт ленты 15 за 6 секунд
[*] Процесс импорта завершён: успешно 15, неуспешно 0, добавлено 89
Метод опробован в боевых условиях и на данный момент никаких нариканий не вызывал.