В этой статье я подготовил для вас прототип CRUD-приложения, которое использует файберы и неблокирующие(асинхронные) возможности драйвера PostreSQL. Вместе они дают любопытные результаты по производительности и потреблению памяти.

Данная статься является продолжением статей:

С ними рекомендуется познакомиться (но совсем необязательно), чтобы лучше понять приведённый код прототипа. Если кратко, то описанный в предыдущих статьях прототип это HTTP-сервер на PHP, который самостоятельно слушает HTTP-порт, принимает HTTP-запросы и формирует ответы, делая это без помощи Nginx, Apache и PHP FPM. Достичь конкурентной обработки запросов ему помогает расширение ev (для отслеживания состояния сетевых сокетов, как более совершенная альтернатива функции socket_select()) и файберы. В данной статье ко всему этому добавилось неблокирующее взаимодействие с сервером PostreSQL.

Код прототипа приведен в репозитории, в ветке aircraft. Приложение имеет два одинаковых CRUD API для сравнения. Оба частично собраны из одних и тех же классов при помощи контейнера внедрения зависимостей. Первый (порт 8085) реализован при помощи HTTP-сервера на базе файберов, в второй (порт 8086) работает на Nginx+PHP FPM. Оба пользуются таблицей aircraft на одном и том же сервере PostgreSQL. Код в репозитории содержит окружение на базе Docker-compose для запуска прототипа.

Конкурентное выполнение запросов

Описываемый файберный HTTP-сервер (порт 8085) содержит только один процесс PHP CLI, как можно убедиться, посмотрев файл docker-compose.yml:

services:
    php:
        command: php -f /app/bin/server.php

Для того, чтобы убедиться в конкурентности, по адресу /sleep добавлен контроллер следующего содержания:

class SleepController
{
    private Db $db;

    public function __construct(Db $db)
    {
        $this->db = $db;
    }

    public function __invoke(ServerRequestInterface $request, FiberedHandler $fiberedHandler): ResponseInterface
    {
        $affectedRows = $this->db->execute($fiberedHandler, 'SELECT pg_sleep(1)');
        $rows = $this->db->query($fiberedHandler, 'SELECT gen_random_uuid() AS uuid');
        if (count($rows) !== 1 || !isset($rows[0]['uuid'])) {
            throw new LogicException('Something went wrong.');
        }
        $uuid = $rows[0]['uuid'];

        return new TextResponse("Hello from Sleep request handler! Affected rows: $affectedRows. Generated uuid: $uuid");
    }
}

Если обработка запросов осуществлялась бы в последовательном режиме, то общее время ожидания ответа на, скажем, 10 параллельных HTTP-запросов было бы не менее 10 секунд, но представленный одиночный PHP CLI процесс справляется с этим за 2+ секунды. Это легко проверить, воспользовавшись утилитой ab:

$ ab -c 10 -n 10 http://localhost:8085/sleep

Concurrency Level:      10
Time taken for tests:   2.029 seconds
Complete requests:      10
Failed requests:        0
Total transferred:      1640 bytes
HTML transferred:       1040 bytes
Requests per second:    4.93 [#/sec] (mean)
Time per request:       2028.604 [ms] (mean)
Time per request:       202.860 [ms] (mean, across all concurrent requests)
Transfer rate:          0.79 [Kbytes/sec] received

Это свидетельствует о том, что весомая часть этих запросов обрабатывается параллельно. Учитывая, что интерпретатор PHP из коробки не поддерживает параллельное выполнение кода, логично сделать вывод, что параллельность pg_sleep(1) здесь обеспечивается сервером PostgreSQL. Однако, сам по себе Postgres представленного результата не обеспечит. Остальную работу делает один постоянно функционирующий процесс PHP, используя файберы, цикл и неблокирующий ввод-вывод.

Сравнение файберного HTTP-сервера и сервера на базе PHP FPM на одном ядре

Каждый из них предоставляет абсолютно одинаковый набор CRUD-операций. В качестве метрики сравнения решено было выбрать время, затраченное на выполнение 4-х операций (создание, чтение, редактирование, удаление) в секунду. Для выполнения тестов используется JMeter. 50 параллельных потоков будут выполнять по 200 запросов каждый, с нулевым интервалом между запросами:

Настройки JMeter

Файл проекта JMeter также можно найти в репозитории.

Результаты работы JMeter для CRUD-приложения на базе файберов (порт 8085)
Результаты работы JMeter для CRUD на базе PHP FPM (порт 8086)

Как видно из результатов: ~58 крудов в секунду против 16 крудов в секунду на одном ядре.

С помощью cAdvisor, Prometheus и Grafana был также измерен объём памяти, который потреблялся каждой из двух версий во время трёх последовательных запусков:

Потребление памяти. Зелёным показан CRUD на файберах, жёлтым - PHP FPM

Таким образом, файберный сервер с пулом соединений к БД потребляет в два раза меньше памяти делая ту же работу в 3 раза быстрее. Всё это делалось на 1 ядре для чистоты эксперимента. Конечно, FPM начнёт выигрывать, если дать ему несколько ядер, однако и для файберного сервера можно запустить несколько процессов за балансировщиком, которые смогут использовать преимущества многоядерных окружений. Так что сравнение на 1 ядре вполне оправдано. Если представленные результаты показались вам интересными, далее предлагаю кратко рассмотреть код прототипа.

Описание кода

Чтобы удобно сделать CRUD, в приложение со времени предыдущих статей были добавлены следующие части:

Для краткости статьи, не буду на них останавливаться, код в репозитории довольно очевиден в части их использования и схож с примерами из документации.

Наибольшего внимания в представленном коде, на мой взгляд, заслуживает слой взаимодействия с БД, а именно файл src/Db/AsyncDb.php. Вот в чём соль:

private function sendQuery(
    FiberedHandler $fiberedHandler,
    Connection $connection,
    Sock $connectionSocket,
    string $query,
    array $params
): void {
    if (!pg_send_query_params($connection, $query, $params)) {
        throw new DbException('Error while sending the query.');
    }
    $start = time();
    $readyTo = null;
    while (true) {
        if ($readyTo === FiberedHandler::SOCKET_READY_TO_READ) {
            App::debugFiber($fiberedHandler, 'Consuming query result.');
            if (!pg_consume_input($connection)) {
                throw new DbException('Error while consuming the query result.');
            }
        }
        $flush = pg_flush($connection);
        App::debugFiber($fiberedHandler, 'Flushing resulted with: %d', $flush);
        if ($flush === false) {
            throw new DbException('Error while sending the query.');
        } elseif ($flush === true) {
            App::debugFiber($fiberedHandler, 'Flushing has been finished successfully.');
            break;
        }
        $fiberedHandler->addReadSocket($connectionSocket);
        $fiberedHandler->addWriteSocket($connectionSocket);
        $readyTo = $fiberedHandler->suspend();
        if ($fiberedHandler->isTimeoutReached($start)) {
            throw new DbException('Connection timed out while executing a database query.');
        }
    }
}

Вместо блокирующих вызовов драйвера PgSQL используются неблокирующие, а если результат ещё не готов, не ждём, а делаем другие, более полезные вещи при помощи файберов. Точнее, когда SQL-запрос отправляется на сервер БД с помощью функции pg_send_query_params() мы уже находимся в контексте файбера (для обработки каждого HTTP-запроса создаётся отдельный файбер) и просто делаем Fiber::suspend(), если отправка ещё не завершена. Приведённый фрагмент кода реализует только отправку запроса на сервер. Получение результата производится схожим способом с помощью функций pg_consume_input(), pg_connection_busy() и pg_get_result(). Эта последовательность рекомендована в документации к драйверу PostgreSQL libpq. Документация драйвера PosgreSQL для PHP не всегда достаточно подробна, но, к счастью, этот драйвер во многом повторяет интерфейс библиотеки libpq, которая снабжена гораздо более подробной документацией.

private function receiveQueryResultAndFetchAllRows(
    FiberedHandler $fiberedHandler,
    Connection $connection,
    Sock $connectionSocket
): array {
    $result = $this->receiveQueryResult($fiberedHandler, $connection, $connectionSocket);
    $rows = pg_fetch_all($result);
    if ($rows === false) {
        throw new DbException('Error while extracting the data from the query result.');
    }
    if (!pg_free_result($result)) {
        throw new DbException('Error while freeing the query result.');
    }

    return $rows;
}

private function receiveQueryResult(
    FiberedHandler $fiberedHandler,
    Connection $connection,
    Sock $connectionSocket
): Result {
    $start = time();
    while (true) {
        App::debugFiber($fiberedHandler, 'Consuming query result.');
        if (!pg_consume_input($connection)) {
            throw new DbException('Error while consuming the query result.');
        }
        if (!pg_connection_busy($connection)) {
            App::debugFiber($fiberedHandler, 'The query result is ready. Going to read it immediately.');
            break;
        }
        App::debugFiber($fiberedHandler, 'The connection is busy. Waiting for the data to read.');
        $fiberedHandler->addReadSocket($connectionSocket);
        $fiberedHandler->suspend();
        if ($fiberedHandler->isTimeoutReached($start)) {
            throw new DbException('Connection timed out while executing a database query.');
        }
    }
    $result = pg_get_result($connection);
    if ($result === false) {
        throw new DbException('Error while getting the query result.');
    }
    $errorMessage = pg_result_error($result);
    if (!empty($errorMessage)) {
        $sqlState = pg_result_error_field($result, PGSQL_DIAG_SQLSTATE);
        if (is_numeric($sqlState)) {
            $code = (int)$sqlState;
        } else {
            $code = 0;
        }
        throw new DbException(sprintf('Error while getting the query result: %s %s', $sqlState, $errorMessage), $code);
    }

    return $result;
}

Эти методы помимо всего прочего требуют передачи объекта соединения с БД. Так как данное приложение это один постоянно живущий процесс, было бы странно создавать и разрывать соединения с БД на каждый HTTP-запрос. По этой причине был реализован пул соединений. Соединение запрашивается из пула перед выполнением SQL-запроса и возвращается в пул после завершения:

public function query(FiberedHandler $fiberedHandler, string $query, array $params = []): array
{
    if (str_contains($query, ';')) {
        throw new InvalidArgumentException('Multiple queries are unsupported.');
    }
    try {
        $connection = $this->connectionPool->obtainConnection($fiberedHandler);
    } catch (DbException $e) {
        App::debugFiber($fiberedHandler, 'Error getting a database connection instance.');
        throw $e;
    }
    try {
        $connectionSocket = $this->extractSocket($connection);
        $this->sendQuery($fiberedHandler, $connection, $connectionSocket, $query, $params);
        $rows = $this->receiveQueryResultAndFetchAllRows($fiberedHandler, $connection, $connectionSocket);
        $this->connectionPool->freeConnection($connection); //DO NOT FORGET TO RETURN THE CONNECTION BACK TO THE POOL!!!
    } catch (DbException $e) {
        App::debugFiber($fiberedHandler, 'DbException occurred. Going to close the database connection: %s', $e->getMessage());
        $this->connectionPool->closeConnection($connection);
        throw $e;
    }

    return $rows;
}

Код самого пула соединений довольно прост, по сути это коллекция соединений с методами доступа:

<?php

class ConnectionPool
{
    /** @var Connection[] */
    private array $freeConnections = [];

    /** @var array<int, Connection>*/
    private array $obtainedConnections = [];

    /**
     * @throws DbException
     */
    public function obtainConnection(FiberedHandler $fiberedHandler): Connection
    {
        if (!empty($this->freeConnections)) {
            /** @var Connection $connection */
            $connection = array_shift($this->freeConnections);
            $connectionStatus = pg_connection_status($connection);
            App::debugFiber($fiberedHandler, 'A free connection found. Status: %d. Pid: %d', $connectionStatus, pg_get_pid($connection));
            if ($connectionStatus !== PGSQL_CONNECTION_OK) {
                pg_close($connection);
                App::debugFiber($fiberedHandler, 'Postgresql connection is not ok: %d', $connectionStatus);
                $connection = $this->initConnection($fiberedHandler);
            }
        } else {
            App::debugFiber($fiberedHandler, 'There is no free connection. Will try to init a new one.');
            $connection = $this->initConnection($fiberedHandler);
        }
        $this->obtainedConnections[spl_object_id($connection)] = $connection;

        return $connection;
    }

    public function freeConnection(Connection $connection): void
    {
        unset($this->obtainedConnections[spl_object_id($connection)]);
        $this->freeConnections[] = $connection;
    }

    public function closeConnection(Connection $connection): void
    {
        unset($this->obtainedConnections[spl_object_id($connection)]);
        pg_close($connection);
    }

     /**
     * @throws DbException
     */
    public function initConnection(FiberedHandler $fiberedHandler): Connection
    {
        App::debugFiber($fiberedHandler, 'Postgresql connection initialization started.');
        $start = time();
        $connection = pg_connect(getenv('PG_CONN_STR'), PGSQL_CONNECT_ASYNC | PGSQL_CONNECT_FORCE_NEW);
        if ($connection === false) {
            $lastError = pg_last_error();
            App::debugFiber($fiberedHandler, 'Postgresql connect error: %s', $lastError);
            throw new DbException(sprintf("Postgresql connect error: %s", $lastError));
        }
        while (true) {
            $connectionStatus = pg_connect_poll($connection);
            App::debugFiber($fiberedHandler, 'Postgresql connection status: %d', $connectionStatus);
            if (in_array($connectionStatus, [PGSQL_POLLING_READING, PGSQL_POLLING_WRITING])) {
                $pgSocket = Sock::fromStream(pg_socket($connection)); //let's see what will happen using this bunch of function calls
                if ($connectionStatus === PGSQL_POLLING_READING) {
                    $fiberedHandler->addReadSocket($pgSocket);
                } else {
                    $fiberedHandler->addWriteSocket($pgSocket);
                }
                App::debugFiber($fiberedHandler, 'Postgresql connection is not ready yet. Suspending fiber.');
                Fiber::suspend();
                if ($fiberedHandler->isTimeoutReached($start)) {
                    App::debugFiber($fiberedHandler, 'Connection timed out while connecting to Postgresql. Reason: %s', pg_last_error($connection));
                    throw new DbException('Connection timed out while connecting to Postgresql.');
                }
                continue;
            }
            if ($connectionStatus === PGSQL_POLLING_FAILED) {
                $lastError = pg_last_error($connection);
                App::debugFiber($fiberedHandler, 'Postgresql connect error: %s', $lastError);
                throw new DbException(sprintf("Postgresql connect error: %s", $lastError));
            } elseif ($connectionStatus === PGSQL_POLLING_OK) {
                App::debugFiber($fiberedHandler, 'Postgresql connection initialized. Connection pid: %s', pg_get_pid($connection));
                break;
            } else {
                App::debugFiber($fiberedHandler, 'Unknown Postgresql connection state: %s', $connectionStatus);
                throw new DbException('Unknown Postgresql connection state.');
            }
        }

        return $connection;
    }
}

Как и в случае с выполнением SQL-запросов, подключение к серверу осуществляется неблокирующим способом, с переключением на другие задачи в случае неготовности подключения. Готовность подключения проверяется с помощью вызова функции pg_connect_poll(), цикл и приостановка файбера также на месте. При запросе свободного подключения клиентским кодом, подключение-кандидат проверяется на валидное состояние с помощью pg_connection_status(), при необходимости создаётся новое подключение.

Далее имеет смысл рассмотреть код сервиса, который вызывается из контроллеров, и собственно, реализует CRUD. Прошу заметить, что я знаю, что SQL-запросам не место в сервисном слое, но в прототипе сделано именно так для краткости. Рассмотрим метод, реализующий добавление записи. Как можно видеть, его код не содержит никаких особенностей, намекающих не конкурентное выполнение или использование неблокирующих вызовов, кроме передачи $fiberedHandlerв нижележащий слой.

public function add(AircraftDto $dto, FiberedHandler $fiberedHandler): void
{
    try {
        $insertedRows = $this->db->execute(
            $fiberedHandler,
            'INSERT INTO aircraft(
                number, 
                title, 
                serial_number, 
                assembly_date, 
                added_at
            ) VALUES($1, $2, $3, $4, $5)',
            [
                $dto->number,
                $dto->title,
                $dto->serialNumber,
                $dto->assemblyDate->format('Y-m-d'),
                $dto->addedAt->format('Y-m-d')
            ]
        );
        if ($insertedRows !== 1) {
            throw new DomainException('Error while adding new aircraft.');
        }
    } catch (DbException $e) {
        if ($e->isDuplicateKey()) {
            throw new DuplicateEntityException('This aircraft has been already added.', $e->getCode(), $e);
        }
        throw new DomainException('Something went wrong.', $e->getCode(), $e);
    }
}

Этот код можно использовать и в неконкурентном приложении, передав заглушку в качестве значения $fiberedHandler, что и делается в PHP FPM части прототипа: сервис, котроллеры и код диспетчера HTTP-запросов на базе FastRoute используются как в файберном сервере, так и в сервере на PHP FPM чтобы сэкономить время и объем прототипа. Остальной код либо описан в предыдущих статьях, либо довольно лёгок для понимания, так что не стану на нём останавливаться.

Выводы

Говоря упрощённо, блокирующие вызовы драйверов БД или блокирующего(синхронного) ввода-вывода под капотом делают то же самое что и неблокирующие(асинхронные) вызовы, но с ожиданием результата. Такое ожидание для каждого блокирующего вызова происходит на стороне ядра, вызывающий процесс при этом вынужден ждать. В представленном примере блокирование выполняется для всех сокетов в приложении вместе с помощью libev (расширения ev) , что даёт возможность сделать свой цикл событий ввода-вывода, а вместо использования системного планировщика для переключения между множеством блокирующихся процессов управляемых FPM - организовать кооперативную многозадачность между файберами. Если не будет ввода-вывода ни на одном из сокетов, то приложение будет заблокировано и ничего не будет делать. Подход "один персональный блокируемый процесс или поток для обработки каждого отдельного HTTP-запроса" не слишком уж эффективен. Процессы и потоки лучше не оставлять простаивать в ожидании, а разработчикам лучше не заниматься выматывающим поиском багов, возникающих из-за проблем с thread-safety в случае использования потоков. Если нужно вычислить что-то объёмное (например, пережать изображение или даже сформировать объёмный HTML или XML, использовать Twig-шаблон), это всегда можно сделать через сервер заданий: отправить такую работу какому-либо вычисляющему воркеру и ожидать результата в неблокирующем режиме, занимаясь обработкой других запросов. Я планирую написать следующую статью об использовании сервера обработки заданий совместно с данным прототипом.

К сожалению, для MySQL реализовать подобное пока не представляется мне возможным. Драйвер mysqli хоть и содержит функции для асинхронного выполнения запросов, но не предоставляет при этом никакого аналога функции pg_socket(), который позволял бы получить доступ к системному сокету соединения с БД. Остаётся надеяться, что в будущем такая функция появится.

Спасибо за чтение и до новых встреч на страницах Хабра;)

Полезные ссылки