Как стать автором
Обновить

PHP process manager

Время на прочтение8 мин
Количество просмотров20K
image

Всем привет!

На Хабре было много статей, о том как писать демоны на PHP и другие fork-нутые вещи. Хочу поделится с вами своими наработками на схожую, но все-таки несколько другую тему — управление несколькими PHP процессами.


Для начала небольшой словарь терминов, используемых в статье.
  • Job (работа) — задача, выполняемая в отдельном процессе. Наберите в консоли «php test.php» — вот вам job.
  • Job Manager или Process Manager — процесс, управляющий задачами. Собирает и обрабатывает их вывод и может посылать сообщения на ввод.


Цель поставленной задачи в том, чтобы иметь возможность влиять на уже запущенные и работающие процессы и получать информацию о ходе их выполнения.

Для запуска новых процессов я использую функцию proc_open, которая позволяет переопределять дескрипторы ввода/вывода для нового процесса. Для управления отдельным процессом был разработан класс Job. Работа характеризуется названием и выполняемой командой.

class Job {
    protected $_pid = 0;
    protected $_name;

    protected $_cmd = '';

    protected $_stderr = '/dev/null';

    private $_resource = NULL;
    private $_pipes = array();
    private $_waitpid = TRUE;

    public function __construct($cmd, $name = 'job') {
        $this->_cmd = $cmd;
        $this->_name = $name;
    }

    public function __destruct() {

        // ожидаем завершения процесса

        if ($this->_resource) {
            if ($this->_waitpid && $this->isRunning()) {
                echo "Waiting for job to complete ";

                $status = NULL;
                pcntl_waitpid($this->_pid, $status);
                
                /*while ($this->isRunning()) {
                    echo '.';
                    sleep(1);
                }*/
                echo "\n";
            }
        }

        // закрываем дескрипторы

        if (isset($this->_pipes) && is_array($this->_pipes)) {
            foreach (array_keys($this->_pipes) as $index ) {
                if (is_resource($this->_pipes[$index])) {
                    fflush($this->_pipes[$index]);
                    fclose($this->_pipes[$index]);
                    unset($this->_pipes[$index]);
                }
            }
        }

        // закрываем открытый хэндлер

        if ($this->_resource) {
            proc_close($this->_resource);
            unset($this->_resource);
        }
       
    }

    public function pid() {
        return $this->_pid;
    }

    public function name() {
        return $this->_name;
    }

    // функция чтения из "трубы". $nohup отвечает за блокирование при чтении
    private function readPipe($index, $nohup = FALSE) {
        if (!isset($this->_pipes[$index])) return FALSE;

        if (!is_resource($this->_pipes[$index]) || feof($this->_pipes[$index])) return FALSE;

        if ($nohup) {
            $data = '';
            while ($line = fgets($this->_pipes[$index])) {
                $data .= $line;
            }
            
            return $data;
        }

        while ($data = fgets($this->_pipes[$index])) {
            echo $data;
        }
    }

    public function pipeline($nohup = FALSE) {
        return $this->readPipe(1, $nohup);
    }

    public function stderr($nohup = FALSE) {
        return $this->readPipe(2, $nohup);
    }

    // запуск задачи в новом процессе
    public function execute() {
                // определяем откуда будет читать и куда писать процесс
        $descriptorspec = array(
            0 => array('pipe', 'r'),  // stdin
            1 => array('pipe', 'w'),  // stdout
            2 => array('pipe', 'w') // stderr 
        );


        $this->_resource = proc_open('exec '.$this->_cmd, $descriptorspec, $this->_pipes);

        // ставим неблокирующий режим всем дескрипторам
        stream_set_blocking($this->_pipes[0], 0); 
        stream_set_blocking($this->_pipes[1], 0);
        stream_set_blocking($this->_pipes[2], 0);

        if (!is_resource($this->_resource)) return FALSE;

        $proc_status     = proc_get_status($this->_resource);
        $this->_pid      = isset($proc_status['pid']) ? $proc_status['pid'] : 0;
    }

    public function getPipe() {
        return $this->_pipes[1];
    }

    public function getStderr() {
        return $this->_pipes[2];
    }

    public function isRunning() {
        if (!is_resource($this->_resource)) return FALSE;

        $proc_status = proc_get_status($this->_resource);
        return isset($proc_status['running']) && $proc_status['running'];
    }

    // посылка сигнала процессу
    public function signal($sig) {
        if (!$this->isRunning()) return FALSE;

        posix_kill($this->_pid, $sig);
    }

    // отправка сообщения в STDIN процесса
    public function message($msg) {
        if (!$this->isRunning()) return FALSE;

        fwrite($this->_pipes[0], $msg);     
    }
}


Для управления работами создан класс Job_Manager, который по сути является ключевым во всей схеме.

class Job_Manager {
    private $_pool_size = 20;
    private $_pool = array();
    private $_streams = array();
    private $_stderr = array();

    private $_is_terminated = FALSE;
    protected $_dispatch_function = NULL;

    public function __construct() {
        // init pool
        // 
    }

    public function __destruct() {
        // destroy pool
        foreach (array_keys($this->_pool) as $index) {
            $this->stopJob($index);
        }
    }

    // Проверяем статус запущенных задач
    private function checkJobs() {
        $running_jobs = 0;
        foreach ($this->_pool as $index => $job) {
            if (!$job->isRunning()) {
                echo "Stopping job ".$this->_pool[$index]->name()." ($index)" . PHP_EOL;
                $this->stopJob($index);
            } else {
                $running_jobs++;
            }
        }

        return $running_jobs;
    }

    private function getFreeIndex() {
        foreach ($this->_pool as $index => $job) {
            if (!isset($job)) return $index;
        }

        return count($this->_pool) < $this->_pool_size ? count($this->_pool) : -1;
    }

    // Запуск новой задачи
    public function startJob($cmd, $name = 'job') {
        // broadcast existing jobs
        $this->checkJobs();

        $free_pool_slots = $this->_pool_size - count($this->_pool);

        if ($free_pool_slots <= 0) {
            // output error "no free slots in the pool"
            return -1;
        }

        $free_slot_index = $this->getFreeIndex();
        if ($free_slot_index < 0) {
            return -1;
        }

        echo "Starting job $name ($free_slot_index)" . PHP_EOL;
        $this->_pool[$free_slot_index] = new Job($cmd, $name);
        $this->_pool[$free_slot_index]->execute();
        $this->_streams[$free_slot_index] = $this->_pool[$free_slot_index]->getPipe();
        $this->_stderr[$free_slot_index] = $this->_pool[$free_slot_index]->getStderr();

        return $free_slot_index;
    }

    public function stopJob($index) {
        if (!isset($this->_pool[$index]))
            return FALSE;
        
        unset($this->_streams[$index]);
        unset($this->_stderr[$index]);
        unset($this->_pool[$index]);
    }

    public function name($index) {
        if (!isset($this->_pool[$index]))
            return FALSE;

        return $this->_pool[$index]->name();
    }

    public function pipeline($index, $nohup = FALSE) {
        if (!isset($this->_pool[$index]))
            return FALSE;

        return $this->_pool[$index]->pipeline($nohup);
    }   

    public function stderr($index, $nohup = FALSE) {
        if (!isset($this->_pool[$index]))
            return FALSE;

        return $this->_pool[$index]->stderr($nohup);
    }

    private function broadcastMessage($msg) {
        // sends selected signal to all child processes
        foreach ($this->_pool as $pool_index => $job) {
            $job->message($msg);
        }
    }

    private function broadcastSignal($sig) {
        // sends selected signal to all child processes
        foreach ($this->_pool as $pool_index => $job) {
            $job->signal($sig);
        }
    }

    // если была зарегистрирована пользовательская функция разбора - используем ее
    protected function dispatch($cmd) {
        if (is_callable($this->_dispatch_function)) {
            call_user_func($this->_dispatch_function, $cmd);
        }
    }

    // регистрация пользовательской функции для разбора
    public function registerDispatch($callable) {
        if (is_callable($callable)) {
            $this->_dispatch_function = $callable;
        } else {
            trigger_error("$callable is not callable func", E_USER_WARNING);
        }
    }

    // разбираем пользовательский ввод
    private function dispatchMain($cmd) {
        $parts = explode(' ', $cmd);
        $arg = isset($parts[0]) ? $parts[0] : '';
        $val = isset($parts[1]) ? $parts[1] : '';
        switch ($arg) {
            case "exit": 
                $this->broadcastSignal(SIGTERM);
                $this->_is_terminated = TRUE;
                break;

            case "test":
                echo 'sending test' . PHP_EOL;
                $this->broadcastMessage('test');
                $this->broadcastSignal(SIGUSR1);
                break;
            case 'kill':
                $pool_index = $val !== '' && (int)$val >= 0 ? (int)$val : -1;
                if ($pool_index >= 0 && isset($this->_pool[$pool_index])) {
                    $this->_pool[$pool_index]->signal(SIGKILL);
                }
                break;
            default:
                $this->dispatch($cmd);
                break;
        }
        return FALSE;
    }

    public function process() {
        stream_set_blocking(STDIN, 0);

        $write = NULL;
        $except = NULL;
        while (!$this->_is_terminated) {
            /*
            из-за особенности функции stream_select приходится особым образом работать с массивами дескрипторов
            */
            $read = $this->_streams;
            $except = $this->_stderr;
            $read[$this->_pool_size] = STDIN;

            if (is_array($read) && count($read) > 0) {
                if (false === ($num_changed_streams = stream_select($read, $write, $except, 2))) {
                    // oops
                } elseif ($num_changed_streams > 0) {
                    // есть что почитать

                    if (is_array($read) && count($read) > 0) {
                        $cmp_array = $this->_streams;
                        $cmp_array[$this->_pool_size] = STDIN;
                        foreach ($read as $resource) {
                            $pool_index = array_search($resource, $cmp_array, TRUE);
                            if ($pool_index === FALSE) continue;
                            
                            if ($pool_index == $this->_pool_size) {
                                // stdin
                                $content = '';
                                while ($cmd = fgets(STDIN)) {
                                    if (!$cmd) break;
                                    $content .= $cmd;
                                }
                                $content = trim($content);
                                if ($content) {
                                    // если Process Manager словил на вход какую-то строчку - парсим и решаем что делать
                                    $this->dispatchMain($content);
                                }
                                //echo "stdin> " . $cmd;
                            } else {
                                // читаем сообщения процессов
                                $pool_content = $this->pipeline($pool_index, TRUE);
                                $job_name = $this->name($pool_index);

                                if ($pool_content) {
                                    echo $job_name ." ($pool_index)" . ': ' . $pool_content;
                                }

                                $pool_content = $this->stderr($pool_index, TRUE);
                                if ($pool_content) {
                                    echo $job_name ." ($pool_index)" . ' [STDERR]: ' . $pool_content;
                                }
                            }
                        }
                    }
                }
            }
            $this->checkJobs();
        }
    }

}


Управлять некоторыми абстрактными задачами мы уже научились, осталось реализовать класс для самих исполняемых процессов.

class Executable {
    protected $_is_terminated = FALSE;

    protected $_cleanup_function = NULL;

    public function __construct() {
        // выставляем обработчик сигналов
        pcntl_signal(SIGTERM, array('Executable', 'signalHandler'));
        pcntl_signal(SIGHUP, array('Executable', 'signalHandler'));
        pcntl_signal(SIGINT, array('Executable', 'signalHandler'));
        pcntl_signal(SIGUSR1, array('Executable', 'signalHandler'));
        pcntl_signal(SIGUSR2, array('Executable', 'signalHandler'));

        stream_set_blocking(STDIN, 0);
        stream_set_blocking(STDOUT, 0);
        stream_set_blocking(STDERR, 0);
    }

    public function __destruct() {
        //echo "destructor called in " . get_class($this) . PHP_EOL;
        if (!$this->_is_terminated) {
            $this->_is_terminated = TRUE;
            $this->isTerminated();
        }
    }


    // финальные обработчики - если пользователь пожелает
    private function cleanup() {
        if (is_callable($this->_cleanup_function)) {
            call_user_func($this->_cleanup_function);
        }
    }

    protected function registerCleanup($callable) {
        if (is_callable($callable)) {
            $this->_cleanup_function = $callable;
        } else {
            trigger_error("$callable is not callable func", E_USER_WARNING);
        }
    }

    protected function isTerminated() {
        pcntl_signal_dispatch();
        if ($this->_is_terminated) {
            $this->cleanup();
        }

        return $this->_is_terminated;
    }

    protected function dispatch($cmd) {
        // можно смело парсить входные данные
        /*
        switch ($cmd) {

        }
        */
    }

    protected function checkStdin() {
        $read = array(STDIN);
        $write = NULL;
        $except = NULL;

        if (is_array($read) && count($read) > 0) {
            if (false === ($num_changed_streams = stream_select($read, $write, $except, 2))) {
                // oops
            } elseif ($num_changed_streams > 0) {
                if (is_array($read) && count($read) > 0) {
                    // stdin
                    $content = '';
                    while ($cmd = fgets(STDIN)) {
                        if (!$cmd) break;
                        $content .= $cmd;
                    }
                    $this->dispatch($content);
                    echo "recieved $content";
                    //echo "stdin> " . $cmd;
                }
            }
        }

    }

    // Обработчик сигналов
    protected function signalHandler ($signo) {
        switch ($signo) {
            case SIGTERM:
            case SIGHUP:
            case SIGINT:
                $this->_is_terminated = TRUE;
                //echo "exiting in ".get_class($this)."...\n";
                break;
            case SIGUSR1:
                //echo "SIGUSR1 recieved\n";
                $this->checkStdin();
                break;
            case SIGUSR2:
                $this->_is_terminated = TRUE;
                echo "[SHUTDOWN] in " . get_class($this) . PHP_EOL;
                flush();
                exit(1);
                break;
            default:
                // handle all other signals
                break;
        }
    }
}



В качестве примера использования менеджера процессов реализуем «спящий» процесс — скрипт, который будет спать и отписываться по этому поводу в STDOUT

sleep.php
class SleeperTest extends Executable {

    public function sleep() {
        for($i = 0; !$this->isTerminated() && $i < 10; $i++) {
            ob_start();
            echo $i . "\n";
            ob_end_flush();
            sleep(5);
        }
    }
}


$s = new SleeperTest;
$s->sleep();


pm.php
$pm = new Job_Manager;

$pm->startJob('php sleep.php', 'sleeper1');
$pm->startJob('php sleep.php', 'sleeper2');

// 
$pm->process();


Используемые в реализации неблокирующие дескрипторы и функция stream_select позволяют избегать проблемы, типичной для разного рода демонов — высокая загрузка ЦПУ в холостом цикле. Предложенный метод лишен этого недостатка, все работает гладко и спокойно.

UPDATE. Выложил исходники классов на github https://github.com/xzag/php-pm
Теги:
Хабы:
+30
Комментарии53

Публикации

Истории

Работа

PHP программист
158 вакансий

Ближайшие события