Параллельные программы на PHP
Раньше заголовок темы был «Написание многопоточных программ на PHP». В PHP есть ровно один «нормальный» способ писать приложения, которые используют несколько ядер/процессоров — это fork(). О прикладном использовании системного вызова fork() в языке PHP и расширения pcntl я и расскажу. В качестве примера мы напишем достаточно быструю параллельную реализацию grep (со скоростью работы, аналогичной
find . -type f -print0 | xargs -0 -P $NUM_PROCS grep $EXPR).Реализация
Реализация этого системного вызова в PHP очень проста:
PHP_FUNCTION(pcntl_fork) { pid_t id; id = fork(); if (id == -1) { PCNTL_G(last_error) = errno; php_error_docref(NULL TSRMLS_CC, E_WARNING, "Error %d", errno); } RETURN_LONG((long) id); }
Что такое системный вызов fork()
Системный вызов fork() в *nix-системах представляет из себя такой системный вызов, который делает полную копию текущего процесса. Системный вызов fork() возвращает своё значение два раза: родитель получает PID потомка, а потомок получает 0. Как ни странно, во многих случаях только этого достаточно для того, чтобы писать приложения, использующие несколько CPU.
$ php -r '$pid = pcntl_fork(); echo posix_getpid() . ": Fork returned $pid\n";' 9545: Fork returned 9546 9546: Fork returned 0
Подводные камни при использовании fork()
На самом деле fork() делает свою работу не задумываясь о том, что находится у пользовательского процесса в памяти — он копирует всё, например функции, которые зарегистрированы через atexit (register_shutdown_function). Пример:
$ php -r 'register_shutdown_function(function() { echo "Exited!\n"; }); pcntl_fork();' Exited! Exited!
К сожалению, PHP в конце выполнения скрипта осуществляет вызов деструкторов (в том числе и внутренних деструкторов ресурсов соединений с базой данных). Пример для расширения mysqli:
<?php /* test.php */ $conn = new mysqli(..., "mysql") or die("Cannot connect\n"); $pid = pcntl_fork(); if ($pid > 0) { echo "Parent exiting\n"; exit(0); } echo "Sending query\n"; $res = $conn->query("SHOW TABLES") or die("Cannot get query result\n"); print_r($res->fetch_all()); /* $ php test.php Parent exiting Sending query Warning: mysqli::query(): MySQL server has gone away in test.php on line 9 Warning: mysqli::query(): Error reading result set's header in test.php on line 9 Cannot get query result */
Вывод программы будет не обязательно таким, как написано. Иногда потомок «успевает» до исполнения процедуры закрытия соединения в родителе и всё работает, как надо.
Боремся с отложенным исполнением функций / деструкторов
На самом деле, проблему с отложенным исполнением можно решить, если вы точно знаете, что хотите. Например, в Си есть функция _exit(), которая выходит, не запуская никаких установленных обработчиков. К сожалению, в PHP такой функции нет, но её поведение можно частично эмулировать с использованием сигналов:
function _exit() { posix_kill(posix_getpid(), SIGTERM); }
Этого «хака» нам будет достаточно, чтобы соединение с базой оставалось активным для двух PHP-процессов одновременно, хотя лучше, конечно, так на практике не делать :):
<?php /* test.php */ $conn = new mysqli(..., "mysql") or die("Cannot connect\n"); function _exit() { posix_kill(posix_getpid(), SIGTERM); } function show_tables() { global $conn; echo "Sending query\n"; $res = $conn->query("SHOW TABLES") or die("Cannot get query result\n"); echo "Tables count: " . $res->num_rows . "\n"; } $pid = pcntl_fork(); if ($pid > 0) { show_tables(); _exit(); } sleep(1); show_tables(); /* $ php test.php Sending query Tables count: 24 Terminated: 15 <--- это вставляет командный интерпретатор $ Sending query Tables count: 24 */
Пишем grep
Давайте теперь, для примера, напишем простенькую версию grep, которая будет искать по маске в текущей директории.
<?php /* Пример использования: $ php grep.php argv ./grep.php:$pattern = "/$argv[1]/m"; */ exec("find . -type f", $files, $retval); // получаем список всех файлов в директории $pattern = "/$argv[1]/m"; foreach($files as $file) { $fp = fopen($file, "rb"); // файл с очень большой вероятностью является двоичным, если в нём встречаются нулевые байты $is_binary = strpos(fread($fp, 1024), "\0") !== false; fseek($fp, 0); if ($is_binary) { if (preg_match($pattern, file_get_contents($file))) echo "$file: binary matches\n"; } else { while (false !== ($ln = fgets($fp))) if (preg_match($pattern, $ln)) echo "$file:$ln"; } fclose($fp); }
Пишем параллельную версию grep
Теперь подумаем, как же можно ускорить данную программу, распараллелив её. Можно легко заметить, что мы можем разделить массив $files (список файлов) на несколько частей и обработать эти части независимо. Причём мы можем так делать во всех случаях, когда у нас есть какой-то большой список задач: просто берем каждый N-ный в соответствующем процессе и обрабатываем его. Поэтому, напишем более-менее общую функцию для этого:
define('PROCESSES_NUM', 2); // задаем количество потоков для обработки function parallelForeach($arr, $func) { for ($proc_num = 0; $proc_num < PROCESSES_NUM; $proc_num++) { $pid = pcntl_fork(); if ($pid == 0) break; } if ($pid) { for ($i = 0; $i < PROCESSES_NUM; $i++) pcntl_wait($status); return; } // обходим каждый PROCESSES_NUM элемент массива и обрабатываем его $l = count($arr); for ($i = $proc_num; $i < $l; $i += PROCESSES_NUM) $func($arr[$i]); exit(0); }
Осталось заменить foreach() на использование нашей функции parallelForeach и добавить обработку ошибок:
Полный исходный текст
<?php /* parallel-grep.php */ define('PROCESSES_NUM', 2); if ($argc != 2) { fwrite(STDERR, "Usage: $argv[0] <pattern>\n"); exit(1); } grep($argv[1]); function grep($pattern) { exec("find . -type f", $files, $retval); if ($retval) exit($retval); $pattern = "/$pattern/m"; if (false === preg_match($pattern, '123')) { fwrite(STDERR, "Incorrect regular expression\n"); exit(1); } parallelForeach($files, function($f) use ($pattern) { grepFile($pattern, $f); }); exit(0); } function grepFile($pattern, $file) { $fp = fopen($file, "rb"); if (!$fp) { fwrite(STDERR, "Cannot read $file\n"); return; } $binary = strpos(fread($fp, 1024), "\0") !== false; fseek($fp, 0); if ($binary) { if (preg_match($pattern, file_get_contents($file))) echo "$file: binary matches\n"; } else { while (false !== ($ln = fgets($fp))) { if (preg_match($pattern, $ln)) echo "$file:$ln"; } } fclose($fp); } function parallelForeach($arr, $func) { for ($proc_num = 0; $proc_num < PROCESSES_NUM; $proc_num++) { $pid = pcntl_fork(); if ($pid < 0) { fwrite(STDERR, "Cannot fork\n"); exit(1); } if ($pid == 0) break; } if ($pid) { for ($i = 0; $i < PROCESSES_NUM; $i++) { pcntl_wait($status); $exitcode = pcntl_wexitstatus($status); if ($exitcode) exit(1); } return; } $l = count($arr); for ($i = $proc_num; $i < $l; $i += PROCESSES_NUM) $func($arr[$i]); exit(0); }
Проверим работу нашего грепа на исходном коде PHP 5.3.10:
$ php ~/parallel-grep.php '^PHP_FUNCTION' | head ./ext/calendar/calendar.c:PHP_FUNCTION(cal_info) ./ext/calendar/calendar.c:PHP_FUNCTION(cal_days_in_month) ./ext/calendar/calendar.c:PHP_FUNCTION(cal_to_jd) ./ext/calendar/calendar.c:PHP_FUNCTION(cal_from_jd) ./ext/calendar/calendar.c:PHP_FUNCTION(jdtogregorian) ./ext/calendar/calendar.c:PHP_FUNCTION(gregoriantojd) ./ext/calendar/calendar.c:PHP_FUNCTION(jdtojulian) ./ext/calendar/calendar.c:PHP_FUNCTION(juliantojd) ./ext/calendar/calendar.c:PHP_FUNCTION(jdtojewish) ./ext/calendar/calendar.c:PHP_FUNCTION(jewishtojd) $ time php ~/parallel-grep.php '^PHP_FUNCTION' | wc -l 4056 real 0m2.073s user 0m3.265s sys 0m0.550s $ time grep -R '^PHP_FUNCTION' . | wc -l 4056 real 0m3.646s user 0m3.415s sys 0m0.209s $ time find . -type f -print0 | xargs -0 -P 2 grep '^PHP_FUNCTION' | wc -l 4056 real 0m1.895s user 0m3.247s sys 0m0.249s
Работает! Я описал один из часто используемых паттернов при параллельном программировании на PHP — параллельная обработка очереди из задач. Надеюсь, моя статья кому-нибудь поможет перестать бояться писать многопоточные приложения на PHP, если задача допускает такую декомпозицию, как в примере с грепом. Спасибо.
