Параллельные программы на PHP
Раньше заголовок темы был «Написание многопоточных программ на PHP». В PHP есть ровно один «нормальный» способ писать приложения, которые используют несколько ядер/процессоров — это fork(). О прикладном использовании системного вызова fork() в языке PHP и расширения pcntl я и расскажу. В качестве примера мы напишем достаточно быструю параллельную реализацию grep (со скоростью работы, аналогичной
find . -type f -print0 | xargs -0 -P $NUM_PROCS grep $EXPR
).Реализация
Реализация этого системного вызова в PHP очень проста:
PHP_FUNCTION(pcntl_fork) {
pid_t id;
id = fork();
if (id == -1) {
PCNTL_G(last_error) = errno;
php_error_docref(NULL TSRMLS_CC, E_WARNING, "Error %d", errno);
}
RETURN_LONG((long) id);
}
Что такое системный вызов fork()
Системный вызов fork() в *nix-системах представляет из себя такой системный вызов, который делает полную копию текущего процесса. Системный вызов fork() возвращает своё значение два раза: родитель получает PID потомка, а потомок получает 0. Как ни странно, во многих случаях только этого достаточно для того, чтобы писать приложения, использующие несколько CPU.
$ php -r '$pid = pcntl_fork(); echo posix_getpid() . ": Fork returned $pid\n";'
9545: Fork returned 9546
9546: Fork returned 0
Подводные камни при использовании fork()
На самом деле fork() делает свою работу не задумываясь о том, что находится у пользовательского процесса в памяти — он копирует всё, например функции, которые зарегистрированы через atexit (register_shutdown_function). Пример:
$ php -r 'register_shutdown_function(function() { echo "Exited!\n"; }); pcntl_fork();'
Exited!
Exited!
К сожалению, PHP в конце выполнения скрипта осуществляет вызов деструкторов (в том числе и внутренних деструкторов ресурсов соединений с базой данных). Пример для расширения mysqli:
<?php
/* test.php */
$conn = new mysqli(..., "mysql") or die("Cannot connect\n");
$pid = pcntl_fork();
if ($pid > 0) {
echo "Parent exiting\n";
exit(0);
}
echo "Sending query\n";
$res = $conn->query("SHOW TABLES") or die("Cannot get query result\n");
print_r($res->fetch_all());
/*
$ php test.php
Parent exiting
Sending query
Warning: mysqli::query(): MySQL server has gone away in test.php on line 9
Warning: mysqli::query(): Error reading result set's header in test.php on line 9
Cannot get query result
*/
Вывод программы будет не обязательно таким, как написано. Иногда потомок «успевает» до исполнения процедуры закрытия соединения в родителе и всё работает, как надо.
Боремся с отложенным исполнением функций / деструкторов
На самом деле, проблему с отложенным исполнением можно решить, если вы точно знаете, что хотите. Например, в Си есть функция _exit(), которая выходит, не запуская никаких установленных обработчиков. К сожалению, в PHP такой функции нет, но её поведение можно частично эмулировать с использованием сигналов:
function _exit() {
posix_kill(posix_getpid(), SIGTERM);
}
Этого «хака» нам будет достаточно, чтобы соединение с базой оставалось активным для двух PHP-процессов одновременно, хотя лучше, конечно, так на практике не делать :):
<?php
/* test.php */
$conn = new mysqli(..., "mysql") or die("Cannot connect\n");
function _exit() {
posix_kill(posix_getpid(), SIGTERM);
}
function show_tables() {
global $conn;
echo "Sending query\n";
$res = $conn->query("SHOW TABLES") or die("Cannot get query result\n");
echo "Tables count: " . $res->num_rows . "\n";
}
$pid = pcntl_fork();
if ($pid > 0) {
show_tables();
_exit();
}
sleep(1);
show_tables();
/*
$ php test.php
Sending query
Tables count: 24
Terminated: 15 <--- это вставляет командный интерпретатор
$ Sending query
Tables count: 24
*/
Пишем grep
Давайте теперь, для примера, напишем простенькую версию grep, которая будет искать по маске в текущей директории.
<?php
/* Пример использования:
$ php grep.php argv
./grep.php:$pattern = "/$argv[1]/m";
*/
exec("find . -type f", $files, $retval); // получаем список всех файлов в директории
$pattern = "/$argv[1]/m";
foreach($files as $file) {
$fp = fopen($file, "rb");
// файл с очень большой вероятностью является двоичным, если в нём встречаются нулевые байты
$is_binary = strpos(fread($fp, 1024), "\0") !== false;
fseek($fp, 0);
if ($is_binary) {
if (preg_match($pattern, file_get_contents($file))) echo "$file: binary matches\n";
} else {
while (false !== ($ln = fgets($fp))) if (preg_match($pattern, $ln)) echo "$file:$ln";
}
fclose($fp);
}
Пишем параллельную версию grep
Теперь подумаем, как же можно ускорить данную программу, распараллелив её. Можно легко заметить, что мы можем разделить массив $files (список файлов) на несколько частей и обработать эти части независимо. Причём мы можем так делать во всех случаях, когда у нас есть какой-то большой список задач: просто берем каждый N-ный в соответствующем процессе и обрабатываем его. Поэтому, напишем более-менее общую функцию для этого:
define('PROCESSES_NUM', 2); // задаем количество потоков для обработки
function parallelForeach($arr, $func)
{
for ($proc_num = 0; $proc_num < PROCESSES_NUM; $proc_num++) {
$pid = pcntl_fork();
if ($pid == 0) break;
}
if ($pid) {
for ($i = 0; $i < PROCESSES_NUM; $i++) pcntl_wait($status);
return;
}
// обходим каждый PROCESSES_NUM элемент массива и обрабатываем его
$l = count($arr);
for ($i = $proc_num; $i < $l; $i += PROCESSES_NUM) $func($arr[$i]);
exit(0);
}
Осталось заменить foreach() на использование нашей функции parallelForeach и добавить обработку ошибок:
Полный исходный текст
<?php
/* parallel-grep.php */
define('PROCESSES_NUM', 2);
if ($argc != 2) {
fwrite(STDERR, "Usage: $argv[0] <pattern>\n");
exit(1);
}
grep($argv[1]);
function grep($pattern)
{
exec("find . -type f", $files, $retval);
if ($retval) exit($retval);
$pattern = "/$pattern/m";
if (false === preg_match($pattern, '123')) {
fwrite(STDERR, "Incorrect regular expression\n");
exit(1);
}
parallelForeach($files, function($f) use ($pattern) { grepFile($pattern, $f); });
exit(0);
}
function grepFile($pattern, $file)
{
$fp = fopen($file, "rb");
if (!$fp) {
fwrite(STDERR, "Cannot read $file\n");
return;
}
$binary = strpos(fread($fp, 1024), "\0") !== false;
fseek($fp, 0);
if ($binary) {
if (preg_match($pattern, file_get_contents($file))) echo "$file: binary matches\n";
} else {
while (false !== ($ln = fgets($fp))) {
if (preg_match($pattern, $ln)) echo "$file:$ln";
}
}
fclose($fp);
}
function parallelForeach($arr, $func)
{
for ($proc_num = 0; $proc_num < PROCESSES_NUM; $proc_num++) {
$pid = pcntl_fork();
if ($pid < 0) {
fwrite(STDERR, "Cannot fork\n");
exit(1);
}
if ($pid == 0) break;
}
if ($pid) {
for ($i = 0; $i < PROCESSES_NUM; $i++) {
pcntl_wait($status);
$exitcode = pcntl_wexitstatus($status);
if ($exitcode) exit(1);
}
return;
}
$l = count($arr);
for ($i = $proc_num; $i < $l; $i += PROCESSES_NUM) $func($arr[$i]);
exit(0);
}
Проверим работу нашего грепа на исходном коде PHP 5.3.10:
$ php ~/parallel-grep.php '^PHP_FUNCTION' | head
./ext/calendar/calendar.c:PHP_FUNCTION(cal_info)
./ext/calendar/calendar.c:PHP_FUNCTION(cal_days_in_month)
./ext/calendar/calendar.c:PHP_FUNCTION(cal_to_jd)
./ext/calendar/calendar.c:PHP_FUNCTION(cal_from_jd)
./ext/calendar/calendar.c:PHP_FUNCTION(jdtogregorian)
./ext/calendar/calendar.c:PHP_FUNCTION(gregoriantojd)
./ext/calendar/calendar.c:PHP_FUNCTION(jdtojulian)
./ext/calendar/calendar.c:PHP_FUNCTION(juliantojd)
./ext/calendar/calendar.c:PHP_FUNCTION(jdtojewish)
./ext/calendar/calendar.c:PHP_FUNCTION(jewishtojd)
$ time php ~/parallel-grep.php '^PHP_FUNCTION' | wc -l
4056
real 0m2.073s
user 0m3.265s
sys 0m0.550s
$ time grep -R '^PHP_FUNCTION' . | wc -l
4056
real 0m3.646s
user 0m3.415s
sys 0m0.209s
$ time find . -type f -print0 | xargs -0 -P 2 grep '^PHP_FUNCTION' | wc -l
4056
real 0m1.895s
user 0m3.247s
sys 0m0.249s
Работает! Я описал один из часто используемых паттернов при параллельном программировании на PHP — параллельная обработка очереди из задач. Надеюсь, моя статья кому-нибудь поможет перестать бояться писать многопоточные приложения на PHP, если задача допускает такую декомпозицию, как в примере с грепом. Спасибо.