Простая система демонов для Yii2

В данной статье постараюсь раскрыть основные нюансы реализации системы демонов для PHP и научить консольные команды Yii2 демонизироваться.

Последние 3 года я занимаюсь разработкой и развитием достаточно большого корпоративного портала для одной группы компаний. Я, какие и многие, столкнулся с проблемой, когда решение задачи, которую требует бизнес, не укладывается ни в какие таймауты. Сделайте отчетик в excel на 300 тыс. строк, отправьте рассылку на 1500 писем и так далее. Естественно, такие задачи должны решаться фоновыми заданиями, демонами и crontab-ами. В рамках статьи я не буду приводить сравнение кронов и демонов, мы для решения подобных задач выбрали демонов. При этом важным требованием для нас стала возможность иметь доступ ко всему, что уже написано для бэкенда, соответственно, демоны должны быть продолжением фрейворка Yii2. По этой же причине нам не подошли уже готовые решения типа phpDaemon.

Под катом готовое решение для реализации демонов на Yii2, которое у меня вышло.

Тема демонов на PHP поднимается с завидной регулярностью (раз, два, три, а ребята из badoo даже перезапускают их без потери соединений). Возможно мой велосипед быстрый способ запустить демоны на популярном фреймворке будет полезен.

Немного основ


Для того, чтобы процесс стал демоном, нужно:
  1. Отвязать скрипт от консоли и стандартных потоков ввода-вывода;
  2. Завернуть исполнения основного кода в бесконечный цикл;
  3. Реализовать механизмы контроля над процессом.

Отвязываемся от консоли

Для начала закрываем стандартные потоки STDIN, STOUT, STDERR. Но PHP без них не может, поэтому первый открытый поток он сделает стандартным, так что откроем их в /dev/null.

if (is_resource(STDIN)) {
   fclose(STDIN);
   $stdIn = fopen('/dev/null', 'r');
}
if (is_resource(STDOUT)) {
    fclose(STDOUT);
    $stdOut = fopen('/dev/null', 'ab');
}
if (is_resource(STDERR)) {
    fclose(STDERR);
    $stdErr = fopen('/dev/null', 'ab');
}

Далее форкаем процесс и делаем форк основным процессом. Процесс донор — завершаем.
$pid = pcntl_fork();
if ($pid == -1) {
   $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() rise error');
} elseif ($pid) {
   $this->halt(self::EXIT_CODE_NORMAL);
} else {
   posix_setsid();
}

Бесконечный цикл и контроль

Я думаю, с циклом все понято. А вот необходимые механизмы контроля стоит рассмотреть подробнее.

Фиксация уже запущенных процессов

Тут все просто — после запуска демон кладет в файл со своим названием свой PID, а при завершении своей работы этот файл сносит.

Обработка POSIX сигналов

Демон должен корректно обрабатывать сигналы от операционной системы, т.е. при получении сигнала SIGTERM должен плавно завершать свою работу. Достигается это несколькими вещами: первое, определяем функцию, которая будет обрабатывать полученные сигналы:

pcntl_signal(SIGTERM, ['MyClassName', 'mySignalHandlerFunction']);

Второе, в функцию обработки сигналов ставим присвоение некоторому статическому свойству класса значение true.
static function signalHandler($signo, $pid = null, $status = null)
{
   self::$stopFlag = true;
}

Ну и третье, наш бесконечный цикл теперь должен быть не такой уж бесконечный:
while (!self::$stopFlag) {
   pcntl_signal_dispatch();
}

Особенности обработки сигналов в разных версиях PHP
В PHP < 5.3.0 для распределения сигналов использовалась специальная директива declare(ticks = N). Где тик — это событие, которое случается каждые N низкоуровневых операций, выполненных парсером внутри блока declare. Распределение сигналов осуществлялось в соответствии с настройкой. Слишком маленькое значение приводило к провалу в производительности, а слишком большое — к несвоевременной обработке сигналов.

В PHP >= 5.3.0 появилась функция pcntl_signal_dispatch(), которую можно вызывать для ручного распределения сигналов, что мы и делаем после каждой итерации.
Ну и наконец, в PHP 7.1 станет доступно асинхронное распределение сигналов, что позволит почти мгновенно получать сигналы без оверхеда и ручного вызова функций.

Теперь при получении команды от операционной системы скрипт спокойно завершит текущую итерацию и выйдет из цикла.

Контроль за утечками памяти

К сожалению, если демон долго трудится без перезапуска — у него начинает течь память. Интенсивность утечки зависит от того, какие функции вы используете. Из нашей практики — наиболее сильно «текли» демоны, которые работают с удаленными SOAP-сервисами через стандартный класс SoapClient. Так что за этим нужно следить и периодически их перезапускать. Дополним наш цикл условием контроля за утечками:

while (!self::$stopFlag) {
   if (memory_get_usage() > $this->memoryLimit) {
      break;
   }
   pcntl_signal_dispatch();
}

Где же код для Yii?


Исходники выложены на Github — yii2-daemon, пакет также доступен для установки через composer.

Пакет состоит всего из 2-х абстрактных классов — базовый класс DaemonController и класс WatcherDaemonController.

DaemonController
<?php

namespace vyants\daemon;

use yii\base\NotSupportedException;
use yii\console\Controller;
use yii\helpers\Console;

/**
 * Class DaemonController
 *
 * @author Vladimir Yants <vladimir.yants@gmail.com>
 */
abstract class DaemonController extends Controller
{

    const EVENT_BEFORE_JOB = "beforeJob";
    const EVENT_AFTER_JOB = "afterJob";

    const EVENT_BEFORE_ITERATION = "beforeIteration";
    const EVENT_AFTER_ITERATION = "afterIteration";

    /**
     * @var $demonize boolean Run controller as Daemon
     * @default false
     */
    public $demonize = false;

    /**
     * @var $isMultiInstance boolean allow daemon create a few instances
     * @see $maxChildProcesses
     * @default false
     */
    public $isMultiInstance = false;

    /**
     * @var $parentPID int main procces pid
     */
    protected $parentPID;

    /**
     * @var $maxChildProcesses int max daemon instances
     * @default 10
     */
    public $maxChildProcesses = 10;

    /**
     * @var $currentJobs [] array of running instances
     */
    protected static $currentJobs = [];

    /**
     * @var int Memory limit for daemon, must bee less than php memory_limit
     * @default 32M
     */
    protected $memoryLimit = 268435456;

    /**
     * @var boolean used for soft daemon stop, set 1 to stop
     */
    private static $stopFlag = false;

    /**
     * @var int Delay between task list checking
     * @default 5sec
     */
    protected $sleep = 5;

    protected $pidDir = "@runtime/daemons/pids";

    protected $logDir = "@runtime/daemons/logs";

    private $stdIn;
    private $stdOut;
    private $stdErr;

    /**
     * Init function
     */
    public function init()
    {
        parent::init();

        //set PCNTL signal handlers
        pcntl_signal(SIGTERM, ['vyants\daemon\DaemonController', 'signalHandler']);
        pcntl_signal(SIGINT, ['vyants\daemon\DaemonController', 'signalHandler']);
        pcntl_signal(SIGHUP, ['vyants\daemon\DaemonController', 'signalHandler']);
        pcntl_signal(SIGUSR1, ['vyants\daemon\DaemonController', 'signalHandler']);
        pcntl_signal(SIGCHLD, ['vyants\daemon\DaemonController', 'signalHandler']);
    }

    function __destruct()
    {
        $this->deletePid();
    }

    /**
     * Adjusting logger. You can override it.
     */
    protected function initLogger()
    {
        $targets = \Yii::$app->getLog()->targets;
        foreach ($targets as $name => $target) {
            $target->enabled = false;
        }
        $config = [
            'levels' => ['error', 'warning', 'trace', 'info'],
            'logFile' => \Yii::getAlias($this->logDir) . DIRECTORY_SEPARATOR . $this->getProcessName() . '.log',
            'logVars' => [],
            'except' => [
                'yii\db\*', // Don't include messages from db
            ],
        ];
        $targets['daemon'] = new \yii\log\FileTarget($config);
        \Yii::$app->getLog()->targets = $targets;
        \Yii::$app->getLog()->init();
    }

    /**
     * Daemon worker body
     *
     * @param $job
     *
     * @return boolean
     */
    abstract protected function doJob($job);

    /**
     * Base action, you can\t override or create another actions
     * @return bool
     * @throws NotSupportedException
     */
    final public function actionIndex()
    {
        if ($this->demonize) {
            $pid = pcntl_fork();
            if ($pid == -1) {
                $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() rise error');
            } elseif ($pid) {
                $this->cleanLog();
                $this->halt(self::EXIT_CODE_NORMAL);
            } else {
                posix_setsid();
                $this->closeStdStreams();
            }
        }
        $this->changeProcessName();

        //run loop
        return $this->loop();
    }

    /**
     * Set new process name
     */
    protected function changeProcessName()
    {
        //rename process
        if (version_compare(PHP_VERSION, '5.5.0') >= 0) {
            cli_set_process_title($this->getProcessName());
        } else {
            if (function_exists('setproctitle')) {
                setproctitle($this->getProcessName());
            } else {
                \Yii::error('Can\'t find cli_set_process_title or setproctitle function');
            }
        }
    }

    /**
     * Close std streams and open to /dev/null
     * need some class properties
     */
    protected function closeStdStreams()
    {
        if (is_resource(STDIN)) {
            fclose(STDIN);
            $this->stdIn = fopen('/dev/null', 'r');
        }
        if (is_resource(STDOUT)) {
            fclose(STDOUT);
            $this->stdOut = fopen('/dev/null', 'ab');
        }
        if (is_resource(STDERR)) {
            fclose(STDERR);
            $this->stdErr = fopen('/dev/null', 'ab');
        }
    }

    /**
     * Prevent non index action running
     *
     * @param \yii\base\Action $action
     *
     * @return bool
     * @throws NotSupportedException
     */
    public function beforeAction($action)
    {
        if (parent::beforeAction($action)) {
            $this->initLogger();
            if ($action->id != "index") {
                throw new NotSupportedException(
                    "Only index action allowed in daemons. So, don't create and call another"
                );
            }

            return true;
        } else {
            return false;
        }
    }

    /**
     * Возвращает доступные опции
     *
     * @param string $actionID
     *
     * @return array
     */
    public function options($actionID)
    {
        return [
            'demonize',
            'taskLimit',
            'isMultiInstance',
            'maxChildProcesses',
        ];
    }

    /**
     * Extract current unprocessed jobs
     * You can extract jobs from DB (DataProvider will be great), queue managers (ZMQ, RabbiMQ etc), redis and so on
     *
     * @return array with jobs
     */
    abstract protected function defineJobs();

    /**
     * Fetch one task from array of tasks
     *
     * @param Array
     *
     * @return mixed one task
     */
    protected function defineJobExtractor(&$jobs)
    {
        return array_shift($jobs);
    }

    /**
     * Main Loop
     *
     * * @return boolean 0|1
     */
    final private function loop()
    {
        if (file_put_contents($this->getPidPath(), getmypid())) {
            $this->parentPID = getmypid();
            \Yii::trace('Daemon ' . $this->getProcessName() . ' pid ' . getmypid() . ' started.');
            while (!self::$stopFlag) {
                if (memory_get_usage() > $this->memoryLimit) {
                    \Yii::trace('Daemon ' . $this->getProcessName() . ' pid ' .
                        getmypid() . ' used ' . memory_get_usage() . ' bytes on ' . $this->memoryLimit .
                        ' bytes allowed by memory limit');
                    break;
                }
                $this->trigger(self::EVENT_BEFORE_ITERATION);
                $this->renewConnections();
                $jobs = $this->defineJobs();
                if ($jobs && !empty($jobs)) {
                    while (($job = $this->defineJobExtractor($jobs)) !== null) {
                        //if no free workers, wait
                        if ($this->isMultiInstance && (count(static::$currentJobs) >= $this->maxChildProcesses)) {
                            \Yii::trace('Reached maximum number of child processes. Waiting...');
                            while (count(static::$currentJobs) >= $this->maxChildProcesses) {
                                sleep(1);
                                pcntl_signal_dispatch();
                            }
                            \Yii::trace(
                                'Free workers found: ' .
                                ($this->maxChildProcesses - count(static::$currentJobs)) .
                                ' worker(s). Delegate tasks.'
                            );
                        }
                        pcntl_signal_dispatch();
                        $this->runDaemon($job);
                    }
                } else {
                    sleep($this->sleep);
                }
                pcntl_signal_dispatch();
                $this->trigger(self::EVENT_AFTER_ITERATION);
            }

            \Yii::info('Daemon ' . $this->getProcessName() . ' pid ' . getmypid() . ' is stopped.');

            return self::EXIT_CODE_NORMAL;
        }
        $this->halt(self::EXIT_CODE_ERROR, 'Can\'t create pid file ' . $this->getPidPath());
    }

    /**
     * Delete pid file
     */
    protected function deletePid()
    {
        $pid = $this->getPidPath();
        if (file_exists($pid)) {
            if (file_get_contents($pid) == getmypid()) {
                unlink($this->getPidPath());
            }
        } else {
            \Yii::error('Can\'t unlink pid file ' . $this->getPidPath());
        }
    }

    /**
     * PCNTL signals handler
     *
     * @param $signo
     * @param null $pid
     * @param null $status
     */
    final static function signalHandler($signo, $pid = null, $status = null)
    {
        switch ($signo) {
            case SIGINT:
            case SIGTERM:
                //shutdown
                self::$stopFlag = true;
                break;
            case SIGHUP:
                //restart, not implemented
                break;
            case SIGUSR1:
                //user signal, not implemented
                break;
            case SIGCHLD:
                if (!$pid) {
                    $pid = pcntl_waitpid(-1, $status, WNOHANG);
                }
                while ($pid > 0) {
                    if ($pid && isset(static::$currentJobs[$pid])) {
                        unset(static::$currentJobs[$pid]);
                    }
                    $pid = pcntl_waitpid(-1, $status, WNOHANG);
                }
                break;
        }
    }

    /**
     * Tasks runner
     *
     * @param string $job
     *
     * @return boolean
     */
    final public function runDaemon($job)
    {
        if ($this->isMultiInstance) {
            $this->flushLog();
            $pid = pcntl_fork();
            if ($pid == -1) {
                return false;
            } elseif ($pid !== 0) {
                static::$currentJobs[$pid] = true;

                return true;
            } else {
                $this->cleanLog();
                $this->renewConnections();
                //child process must die
                $this->trigger(self::EVENT_BEFORE_JOB);
                $status = $this->doJob($job);
                $this->trigger(self::EVENT_AFTER_JOB);
                if ($status) {
                    $this->halt(self::EXIT_CODE_NORMAL);
                } else {
                    $this->halt(self::EXIT_CODE_ERROR, 'Child process #' . $pid . ' return error.');
                }
            }
        } else {
            $this->trigger(self::EVENT_BEFORE_JOB);
            $status = $this->doJob($job);
            $this->trigger(self::EVENT_AFTER_JOB);

            return $status;
        }
    }

    /**
     * Stop process and show or write message
     *
     * @param $code int -1|0|1
     * @param $message string
     */
    protected function halt($code, $message = null)
    {
        if ($message !== null) {
            if ($code == self::EXIT_CODE_ERROR) {
                \Yii::error($message);
                if (!$this->demonize) {
                    $message = Console::ansiFormat($message, [Console::FG_RED]);
                }
            } else {
                \Yii::trace($message);
            }
            if (!$this->demonize) {
                $this->writeConsole($message);
            }
        }
        if ($code !== -1) {
            \Yii::$app->end($code);
        }
    }

    /**
     * Renew connections
     * @throws \yii\base\InvalidConfigException
     * @throws \yii\db\Exception
     */
    protected function renewConnections()
    {
        if (isset(\Yii::$app->db)) {
            \Yii::$app->db->close();
            \Yii::$app->db->open();
        }
    }

    /**
     * Show message in console
     *
     * @param $message
     */
    private function writeConsole($message)
    {
        $out = Console::ansiFormat('[' . date('d.m.Y H:i:s') . '] ', [Console::BOLD]);
        $this->stdout($out . $message . "\n");
    }

    /**
     * @param string $daemon
     *
     * @return string
     */
    public function getPidPath($daemon = null)
    {
        $dir = \Yii::getAlias($this->pidDir);
        if (!file_exists($dir)) {
            mkdir($dir, 0744, true);
        }
        $daemon = $this->getProcessName($daemon);

        return $dir . DIRECTORY_SEPARATOR . $daemon;
    }

    /**
     * @return string
     */
    public function getProcessName($route = null)
    {
        if (is_null($route)) {
            $route = \Yii::$app->requestedRoute;
        }

        return str_replace(['/index', '/'], ['', '.'], $route);
    }

    /**
     *  If in daemon mode - no write to console
     *
     * @param string $string
     *
     * @return bool|int
     */
    public function stdout($string)
    {
        if (!$this->demonize && is_resource(STDOUT)) {
            return parent::stdout($string);
        } else {
            return false;
        }
    }

    /**
     * If in daemon mode - no write to console
     *
     * @param string $string
     *
     * @return int
     */
    public function stderr($string)
    {
        if (!$this->demonize && is_resource(\STDERR)) {
            return parent::stderr($string);
        } else {
            return false;
        }
    }

    /**
     * Empty log queue
     */
    protected function cleanLog()
    {
        \Yii::$app->log->logger->messages = [];
    }

    /**
     * Empty log queue
     */
    protected function flushLog($final = false)
    {
        \Yii::$app->log->logger->flush($final);
    }
}


WatcherDaemonController
<?php

namespace vyants\daemon\controllers;

use vyants\daemon\DaemonController;

/**
 * watcher-daemon - check another daemons and run it if need
 *
 * @author Vladimir Yants <vladimir.yants@gmail.com>
 */
abstract class WatcherDaemonController extends DaemonController
{
    /**
     * @var string subfolder in console/controllers
     */
    public $daemonFolder = 'daemons';

    /**
     * @var boolean flag for first iteration
     */
    protected $firstIteration = true;

    /**
     * Prevent double start
     */
    public function init()
    {
        $pid_file = $this->getPidPath();
        if (file_exists($pid_file) && ($pid = file_get_contents($pid_file)) && file_exists("/proc/$pid")) {
            $this->halt(self::EXIT_CODE_ERROR, 'Another Watcher is already running.');
        }
        parent::init();
    }

    /**
     * Job processing body
     *
     * @param $job array
     *
     * @return boolean
     */
    protected function doJob($job)
    {
        $pid_file = $this->getPidPath($job['daemon']);

        \Yii::trace('Check daemon ' . $job['daemon']);
        if (file_exists($pid_file)) {
            $pid = file_get_contents($pid_file);
            if ($this->isProcessRunning($pid)) {
                if ($job['enabled']) {
                    \Yii::trace('Daemon ' . $job['daemon'] . ' running and working fine');

                    return true;
                } else {
                    \Yii::warning('Daemon ' . $job['daemon'] . ' running, but disabled in config. Send SIGTERM signal.');
                    if (isset($job['hardKill']) && $job['hardKill']) {
                        posix_kill($pid, SIGKILL);
                    } else {
                        posix_kill($pid, SIGTERM);
                    }

                    return true;
                }
            }
        }
        \Yii::error('Daemon pid not found.');
        if ($job['enabled']) {
            \Yii::trace('Try to run daemon ' . $job['daemon'] . '.');
            $command_name = $job['daemon'] . DIRECTORY_SEPARATOR . 'index';
            //flush log before fork
            $this->flushLog(true);
            //run daemon
            $pid = pcntl_fork();
            if ($pid === -1) {
                $this->halt(self::EXIT_CODE_ERROR, 'pcntl_fork() returned error');
            } elseif ($pid === 0) {
                $this->cleanLog();
                \Yii::$app->requestedRoute = $command_name;
                \Yii::$app->runAction("$command_name", ['demonize' => 1]);
                $this->halt(0);
            } else {
                $this->initLogger();
                \Yii::trace('Daemon ' . $job['daemon'] . ' is running with pid ' . $pid);
            }
        }
        \Yii::trace('Daemon ' . $job['daemon'] . ' is checked.');

        return true;
    }

    /**
     * @return array
     */
    protected function defineJobs()
    {
        if ($this->firstIteration) {
            $this->firstIteration = false;
        } else {
            sleep($this->sleep);
        }

        return $this->getDaemonsList();
    }

    /**
     * Daemons for check. Better way - get it from database
     * [
     *  ['daemon' => 'one-daemon', 'enabled' => true]
     *  ...
     *  ['daemon' => 'another-daemon', 'enabled' => false]
     * ]
     * @return array
     */
    abstract protected function getDaemonsList();

    /**
     * @param $pid
     *
     * @return bool
     */
    public function isProcessRunning($pid)
    {
        return file_exists("/proc/$pid");
    }
}


DaemonController

Это родительский класс для всех демонов. Вот минимальный пример демона:

<?php

namespace console\controllers\daemons;

use vyants\daemon\DaemonController;

class TestController extends DaemonController
{
    /**
     * @param $job
     *
     * @return boolean
     */
    protected function doJob($job)
    {
        //do some job
        return true;
    }

    /**
     * @return array
     */
    protected function defineJobs()
    {
       return [];
    }
}

Функция defineJobs() должна возвращать набор задач для выполнения. По-умолчанию ожидается, что она будет возвращать массив. Если вы хотите возвращать, скажем MongoCursor, потребуется еще переопределить defineJobExtractor(). Функция doJob() должна получать на вход одну задачу для выполнения, производить с ней необходимые операции и помечать данную задачу в источнике как отработанную, чтобы она не упала второй раз.

Возможные параметры и настройки:
  • demonize — данный параметр определяет будет ли скрипт демонизироваться или работать как консольное приложение. Параметр доступен для задания из консоли: --demonize=1
  • isMultiInstance и maxChildProcesses — определяет можно ли демону создавать свои собственные копии и какое их максимальное количество может одновременно работать. Данная функция позволяет выполнять несколько задач параллельно. doJob будет выполняться в дочерних процессах, а родительский процесс будет только делегировать задачи своим потомкам и следить, чтобы их количество не превышало допустимый максимум. Весьма полезно, если ресурсов сервера хватает для того, чтобы выполнять несколько достаточно продолжительных по времени задач. По-умолчанию такое поведение выключено. Параметры так же доступны из консоли: --isMultiInstance=1 --maxChildProcesses=2
  • memoryLimit — порог потребления демоном памяти, если демон в режиме ожидания превысит данный порог, то он благородно совершить сиппоку. Как уже было обозначено ранее, для уменьшения размера потребляемой демонами памяти в результате утечек.
  • sleep — время в секундах, на которое демон будет засыпать между проверками наличия задач. Демон отправится спать только если defineJob вернет empty и пока есть задачи демон спать не будет. Поэтому defineJobs не должна возвращать статический список задач, иначе демон будет молотить их без конца и отдыха.
  • pidDir и logDir — пути для хранения логов и pid-ов, поддерживают алиасы Yii. По-умолчанию "@runtime/daemons/pids" и "@runtime/daemons/logs"


Проблема потери соединений

При осуществлении операции fork() установленные в родительском процессе соединения перестают работать в дочерних процессах. Для того, чтобы избежать этой проблемы, после всех форков проставлен вызов функции renewConnections(). По-умолчанию, данная функция переподключает только Yii::$app->db, но вы можете переопределить ее, и добавить прочие источники, соединение с которыми нужно поддерживать в дочерних процессах.

Логгирование

Демоны перенастраивают стандартный логгер Yii под себя. Если вас не устраивает поведение по-умолчанию — переопределите функцию initLogger().

WatcherDaemonController

Это почти готовый демон-наблюдатель. Задача данного демона следить за другими демонами, запускать и останавливать их при необходимости. Он не может стартовать дважды, поэтому можно смело поставить его запуск в crontab. Для того, чтобы начать его использовать, нужно в console/controllers создать папку daemons и положить класс вида:

<?php

namespace console\controllers\daemons;

use vyants\daemon\controllers\WatcherDaemonController;

/**
 * Class WatcherController
 */
class WatcherController extends WatcherDaemonController
{

    protected $sleep = 10;

    /**
     * @return array
     */
    protected function getDaemonsList()
    {
        return [
            ['daemon' => 'daemons/test', 'enabled' => true]
        ];
    }
}

Требуется определить лишь одну функцию — getDaemonsList(), которая вернет список демонов за которыми нужно следить. В самом простом виде — это зашитый в код массив, но в таком случае вы не будете иметь возможности менять список «на лету». Положите список демонов в базу или отдельный файлик и получайте его каждый раз оттуда. В таком случае, watcher сможет включить или выключить демон без собственного перезапуска.

Заключение


В данный момент у нас более 50-ти демонов, выполняющих разнообразные задачи, начиная от отправки почтовых сообщений и заканчивая генерацией отчетов и актуализацией данных между разными системами.

Демоны работают с разными источниками задач — MySQL, RabbitMQ и даже удаленными веб-сервисами. Полет нормальный.
Безусловно, демоны на php не сравнятся с теми же демонами на Go. Но высокая скорость разработки, возможность повторного использования уже написанного кода и отсутствие необходимости учить команду другому языку перевешивают минусы.
Share post

Comments 40

    +4
    а чем не угодил супервизор и обычные команды yii-шные?
      0
      «Не угодили» не совсем та фраза. Это хорошее решение. Но я бы не сказал, что это сильно проще — консольный команды для правильной работы с супервизором все равно нужно «допилить» — ну как минимум они должны хэндлить PCNTLсигналы и корректно завершать текущую задачу.
      Для нас была важна потребность в параллельной обработке задач одним демоном. Да, супервизор может запускать несколько инстансов одной команды, но они всегда будут висеть в памяти (тут тоже можно поспорить, конечно, что лучше — лишний процесс в памяти или форк при необходимости), к тому же нужно будет решать проблему корректного распределения задач между ними (а у нас далеко не всегда есть возможность использовать нормальный менеджер очередей).

        +1
        вам конечно в вашем проекте видней, но с моей колокольни прикрутить банальную очередь на редисе вообще не проблема, если к роликом или любым другим менеджером не хочется напрягаться. Демоны на то и демоны, что висят в памяти и переодически опрашивают очередь на наличие задач.
        та даже очередь через бд не проблема прикрутить.
          0
          В случае очереди на БД, если 2 инстанса одной консольной команды одновременно потянутся за задачами есть шанс, что оба получать одну и ту же задачу и она выполниться дважды. На Redis-е этого можно избежать, ну а тот же RabbitMQ этой проблемы лишен, задача дойдет только одному консумеру. Я поэтому и написал, что если нет возможности использовать нормальный менеджер очередей — эту проблему придется решать.
          У нас часть задач в очередях на БД, часть в RabbitMQ, не буду вдаваться в подробности, так было необходимо, поэтому для нас проблема актуальна.
          Вы, конечно, правы, для этих целей можно использовать и супервизор. Даже кронами можно обойтись. Наше решение — всего лишь еще один способ решения задачи.
            0
            есть у нас и пару воркеров на очереди в бд, делается это так:
            воркер делает запрос
            update queueTable set handled_by=«some unique id», begin_at=NOW() where handled_by is null limit=10;

            после этого селектит таски из по своему айдишнику.

            зы. если возможны падения независимые от вас, можно поставить еще воркер надзиратель, который обнуляет handled_by по истечению таймаута от begin_at
            ззы. и это все еще проще чем ваше решение
            зззы. но в целом не вижу смысла делать очереди на бд, когда есть редис… разве что система крутится на шаред хостинге, где только бд и доступна
              0
              В случае очереди на БД, если 2 инстанса одной...

              Зависит от уровня изоляции, если поставить serialized и в транзакции ставить метку в строку, то один запрос отвалится с retry transaction
                0
                Очереди в бд — не лучшее решение, но может пригодиться упрощенный псевдокод (применим к MySQL):
                // лучше иметь отдельный коннект для очереди
                $db->query('SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE');
                $db->beginTransaction();
                
                // время, когда считаем lock устаревшим
                $locktimeout = '2001-01-01 00:00:00';
                
                try{
                 $row = $db->getRow('SELECT * from `queue` WHERE `lock` IS NULL OR `lock` < "'.$locktimeout.'" LIMIT 1');
                 $db->update('queue' , ['lock' => date('Y-m-d H:i:s')], 'where `id`='.$row['id']);
                 $db->commit();
                }catch (Exception $e){
                    $db->rollBack();
                    // deadlock, try restart transaction, как ожидаемое поведение MySQL
                    if($e->getCode() = 1213){
                        // принимаем меры
                    }
                }
                

                  0
                  Да, очереди в БД это решение пока нет большой нагрузки.
                  Можно ещё использовать вариант который был на одном проекте — выбирать задачи с pk кратным чему-то. Досталось мне в наследство, впоследствии переделал на нормальный вариант с брокером сообщений.
                    0
                    Хм, а SELECT FOR UPDATE чем плох?
                      +1
                      Процесс 1 будет ждать, чтобы потом узнать что эту задачу уже выполнил процесс 2. И это вместо того чтобы процесс 1 делал какую-то полезную работу.
                        0
                        Хм, если предположить что выполнение задачи >> выборки её из очереди, то делаем
                        1. SELECT FOR UPDATE,
                        2. Ставим лок на эту запись через UPDATE,
                        3. Долго выполняем
                        Если два процесса одновременно делают запрос на выборку задачи второй будет ожидать UPDATE первого (N ms)
                          0
                          Ну да, я про это и говорю. Это плохо увязывается с масштабированием нагрузки. Если выбирать случайную запись по статусу, то процессы постоянно будут хватать одни и те же записи и ничего не делать. Допустим в supervisord прописано 40 воркеров, из которых 10-20 ждёт освобождения лока. А если наш стартап идёт в гору, количество задач растёт, увеличиваем количество воркеров, они не влазят на одну машину, разносим на разные. И там уже будут сплошные блокировки.
                          Пока задачи не очень критичные и их мало, такой вариант годится. Если задач много — то уже не особо.
                            0
                            потому сначала пометка что задача взята в работу со сбросом этого статуса по истечению таймаута
                            после выполнения — пометка, что задача сделана
                              0
                              Проверено — будут постоянно брать одну и ту же всё равно. Один процесс уже взял, закоммитить не успел, и тут же второй берёт.
                                0
                                он не берет. он делает первым же запросом апдейт N записей.
                                притом помечает их какимто уникальным айдишником. допустим так:

                                $uniqueId = uniqueid('w_', true);
                                $this->db->query(«update {$this-tableName} set locked_by='{$uniqueId}', locked_at=NOW(), status='in progress' where locked_by is null limit 10»);


                                а теперь мы уже спокойно можем сделать селект по $uniqueId обработать свои ивенты, а по завершению либо поставить статус complete либо вообще удалить записи, если история не нужна. Если не нужна история, то поле статуса тоже не нужно, достаточно locked_by

                                зы. дополнительно нужен крон, который будет раз в N минут сбрасывать лок с записей, если locked_at был слишком давно и статус ин прогресс
                                  0
                                  Ну в общем — на первое время и пока нет нагрузки.
                                    0
                                    чем помешает нагрузка? я не спорю, что очередь в бд это костыль? я о том, что если нет возможности/желания использовать редис или нормальный броке, то бд вполне вариант
                                  0
                                  В том варианте, что я предложил: второй воркер, если возьмет ту же задачу, что и первый — не сможет повесить на нее лок, соответственно возьмет следующую по списку
                          0
                          Скорее решение не «пока нет большой нагрузки», а пока нет сотен тысяч заданий.
                      0

                      Безусловно, RabbitMQ (и даже Redis) — удобнее и быстрее. Но и с БД тоже можно поработать до определенных нагрузок.


                      Если в качестве хранилища использовать MySQL, то можно воспользоваться блокировками через GET_LOCK и RELEASE_LOCK.


                      -- Ждем как освободится блокировка и ставим ее
                      SELECT GET_LOCK('queue_pop', -1);
                      -- Читаем задачу из очереди
                      SELECT * FROM queue WHERE started_at IS NULL ORDER BY id LIMIT 1;
                      -- Если задача найдена, переводим ее в статус обработки
                      UPDATE queue SET started_at = NOW() WHERE id = :id;
                      -- Снимаем блокировку
                      SELECT RELEASE_LOCK('queue_pop');
                      -- Обрабатываем задачу если она получена

                      Так между воркерами синхронизируется только момент получения задачи из очереди, а сама обработка останется асинхронной.


                      Подобные блокировки еще есть в Postgres и Oracle, а в Yii2 есть обертки для них:


                0
                What is the reason for performing a double fork when creating a daemon?
                http://stackoverflow.com/questions/881388/what-is-the-reason-for-performing-a-double-fork-when-creating-a-daemon

                  0
                  posix_setsid() делает дочерний процесс лидером сессии и необходимость во втором форке пропадает.
                  0
                  pcntl хорошо работает только для самых простых приложений. Но в реальности крайне неприятно следить за конекшенами, перезапускать демоны, следить вообще за ними и распределять нагрузку. Еще из возможных неприятных проблем это то что syntax error or fatal error никак не хендлится. Ну поесть если пришила такая задача на которой приложение упало (причем желательно сразу), то через какое время будет лежать все. Медленно текущая память тоже не сильно принято, когда утекает сначала по 10-20 мб а потом это выливается в 2Гб демон который еле работает из постоянной попытки собрать мусор и выделить новую память.
                  На порядок проще и стабильно работает связка: демон очередей (rabbit/gearman/redis pub/sub etc.) + supervizor.
                    +1
                    Спасибо за комментарий. Видно, что Вы лично прошлись по этим граблям :)
                    У нас сейчас работает 51 демон. Все работают с разным задачами, и многие из них весьма нетривиальны. Я бы не сказал что это «простое приложение». Проблему с коннекшенами мы решили и больше с ней не сталкиваемся. Руками мы ничего не завершаем и не перезапускаем, все делает Watcher, а команды ему мы даем из веб-интерфейса.
                    Поясните, пожалуйста, какая именно проблема с распределением нагрузки у Вас возникала?
                    По поводу перехвата ошибок, да проблема была в 5.6 (хотя некоторую часть неперехватываемых ошибок удавалось обрабатывать при помощи register_shutdown_function), но в 7-ке проблема уже не так актуальна. Если при выполнении задачи возникла ошибка, мы помечаем задачу, а дальше логи и newrelic, отлавливаем и фиксим.
                    Проблема с утечками тоже учтена.
                      +2
                      Проблема с конекшенами, это не только открыть — но и закрыть. Что иногда даже важнее. Любой зависший, упавший но не закрывший осенние варке будет держать соединение вечно — пока его не перезапустишь. И в какой-то момент можно наблюдать по 10-20к соединений к memcached / mongodb cluster, после которого для начала сразишься тюнить систему разрешая больше соединений, больше открытых файлов. Но в конце концов это работает плохо.

                      Касательно отлова ошибок: shutdown_function не вызовется когда варвар сожрал всю оперативку и упал к примеру или unrecoverable fatal. И это крайне неприятно, потому что и поместить задачу ты не можешь. Она просто не доходит до конца.

                      После долгой борьбы мы пришли к более простому понимаю что такое воркер: воркер это независимая команда. А если она независимая то нет разницы запускать через fork or php -f. Но с другой стороны запуская через php -f, ты получаешь все бонусы 100% независимой команды, все соединия открываются и закрываются самим php, нет зависимости от предидущих глобальных переменных и значений, этот подход позволяет утилизировать память по максимуму.

                      Benefits:
                      — ловля всех ошибок!
                      — потребление памяти воркерами сократилось в 10 раз
                      — срочность процессинга выросла в 3.5 раза (из-за того что меньше оперативки можно отключить GC)
                      — простота разработки
                      — простота connection management
                      — не течетет блин вообще никак (сам manager жрет 8Mb оперативки и живет месяцами)
                      — и как приятный бонус не нужно ставить pcntl ;)
                      Drawbacks:
                      — дополнительные 20 мс на запуск внешнего интерпретатора
                    0
                    В какой версии PHP наблюдаются утечки? PHP7 пробовали?
                      +1
                      Во всех. Мы перешли на PHP 7 почти сразу после релиза, на нем получше, но от утечек никуда не денешься. Сильнее течет память в extensions.
                      Даже fpm воркеры рекомендуется перезапускать (pm.max_requests)
                        +1
                        У меня есть простенький демон на silex, работает месяцами, стучит курлом к внешнему ресурсу, собирает данные и обновляет некий срез. Стабильно работает месяцами, иногда я его перезапускаю «на всякий случай», но утечек за ним я не наблюдал. и там точно не php 7. Полагаю, что ваши утечки – это проблемы из вашего же кода/фреймворка. Просто Yii не выглядит чем-то не «рожденным чтобы умереть».
                          0
                          думаю рекомендуется их перезапускать от греха и кривых рук програмистов подальше. вообще у нас используются воркеры на базе команд и супервизора, с утечками не сталкивались.
                          зы. работает около 700-800 разных воркеров по 1-100 инстансов
                          ззы. еще первая yii
                            0
                            Кажется утечки как раз и дело «кривых рук».
                            Я бы не сказал, что они нас сильно беспокоят. За все время сильно утекала память при только использовании SoapClient, но после 7.0.5 проблему больше не замечали. Защита от утечек довольно простая, так что лучше подстраховаться и не ждать, пока нарвешься на какую-то сильно «подтекающую» функцию.
                              0
                              безусловно, если можно подстраховаться, то стоит это делать) я лишь имел ввиду что перезапуск воркеров это не необходимое условие для их нормальной работы
                        0
                        У меня утечка памяти была как то при работе с Imagine. Помог ручной вызов gc_collect_cycles().
                          0
                          Вообще интересное решение, но как-то не в идеологии php в целом. У нас все подобные задачи решаются кроном/другими языками.
                            +1
                            PHP давно перестал быть шаблонизатором.
                              +1
                              но пока всеже умирает. это не плохо, но не очень согласуется с долгоживущими концепциями)
                                0
                                Сейчас уже программист решает, «умирает» или «не умирает».
                                Время, когда такого выбора не было, давно прошло.
                            +1
                            в проекте аналогичные задачи, но обошлись классикой beanstalk+worker все это дело крутится в supervisord
                            все гораздо проще и прозрачнее, без каких либо форков https://github.com/sergebezborodov/beanstalk-yii2
                              0
                              Beanstalk не пробовал, спасибо, пощупаем. Пробовали persistent вариант?
                                0
                                persistent — нет, все откладывал настройку, но в итоге за три года работы beanstalk так ни разу и не упал
                                0
                                Спасибо за расширение! Очень удобное.
                                Но у меня есть вопрос. Как бороться с «фантомными» процессами? Я заметил, что когда демон работает очень долго, то остаются в памяти несколько процессов, которые никогда не умирают, но и ничего не делают, а только отнимают память и место для других процессов. Такое у меня было и раньше. До вашего расширения я создавал демоны по аналогичной методике и тоже оставались фантомные процессы, которые в итоге я убивал вручную.

                                Only users with full accounts can post comments. Log in, please.