Как стать автором
Обновить

CRUD на PHP с использованием файберов и пула соединений с PostgreSQL

Уровень сложностиСредний
Время на прочтение12 мин
Количество просмотров1.3K

В этой статье я подготовил для вас прототип CRUD-приложения, которое использует файберы и неблокирующие(асинхронные) возможности драйвера PostreSQL. Вместе они дают любопытные результаты по производительности и потреблению памяти.

Данная статься является продолжением статей:

С ними рекомендуется познакомиться (но совсем необязательно), чтобы лучше понять приведённый код прототипа. Если кратко, то описанный в предыдущих статьях прототип это HTTP-сервер на PHP, который самостоятельно слушает HTTP-порт, принимает HTTP-запросы и формирует ответы, делая это без помощи Nginx, Apache и PHP FPM. Достичь конкурентной обработки запросов ему помогает расширение ev (для отслеживания состояния сетевых сокетов, как более совершенная альтернатива функции socket_select()) и файберы. В данной статье ко всему этому добавилось неблокирующее взаимодействие с сервером PostreSQL.

Код прототипа приведен в репозитории, в ветке aircraft. Приложение имеет два одинаковых CRUD API для сравнения. Оба частично собраны из одних и тех же классов при помощи контейнера внедрения зависимостей. Первый (порт 8085) реализован при помощи HTTP-сервера на базе файберов, в второй (порт 8086) работает на Nginx+PHP FPM. Оба пользуются таблицей aircraft на одном и том же сервере PostgreSQL. Код в репозитории содержит окружение на базе Docker-compose для запуска прототипа.

Конкурентное выполнение запросов

Описываемый файберный HTTP-сервер (порт 8085) содержит только один процесс PHP CLI, как можно убедиться, посмотрев файл docker-compose.yml:

services:
    php:
        command: php -f /app/bin/server.php

Для того, чтобы убедиться в конкурентности, по адресу /sleep добавлен контроллер следующего содержания:

class SleepController
{
    private Db $db;

    public function __construct(Db $db)
    {
        $this->db = $db;
    }

    public function __invoke(ServerRequestInterface $request, FiberedHandler $fiberedHandler): ResponseInterface
    {
        $affectedRows = $this->db->execute($fiberedHandler, 'SELECT pg_sleep(1)');
        $rows = $this->db->query($fiberedHandler, 'SELECT gen_random_uuid() AS uuid');
        if (count($rows) !== 1 || !isset($rows[0]['uuid'])) {
            throw new LogicException('Something went wrong.');
        }
        $uuid = $rows[0]['uuid'];

        return new TextResponse("Hello from Sleep request handler! Affected rows: $affectedRows. Generated uuid: $uuid");
    }
}

Если обработка запросов осуществлялась бы в последовательном режиме, то общее время ожидания ответа на, скажем, 10 параллельных HTTP-запросов было бы не менее 10 секунд, но представленный одиночный PHP CLI процесс справляется с этим за 2+ секунды. Это легко проверить, воспользовавшись утилитой ab:

$ ab -c 10 -n 10 http://localhost:8085/sleep

Concurrency Level:      10
Time taken for tests:   2.029 seconds
Complete requests:      10
Failed requests:        0
Total transferred:      1640 bytes
HTML transferred:       1040 bytes
Requests per second:    4.93 [#/sec] (mean)
Time per request:       2028.604 [ms] (mean)
Time per request:       202.860 [ms] (mean, across all concurrent requests)
Transfer rate:          0.79 [Kbytes/sec] received

Это свидетельствует о том, что весомая часть этих запросов обрабатывается параллельно. Учитывая, что интерпретатор PHP из коробки не поддерживает параллельное выполнение кода, логично сделать вывод, что параллельность pg_sleep(1) здесь обеспечивается сервером PostgreSQL. Однако, сам по себе Postgres представленного результата не обеспечит. Остальную работу делает один постоянно функционирующий процесс PHP, используя файберы, цикл и неблокирующий ввод-вывод.

Сравнение файберного HTTP-сервера и сервера на базе PHP FPM на одном ядре

Каждый из них предоставляет абсолютно одинаковый набор CRUD-операций. В качестве метрики сравнения решено было выбрать время, затраченное на выполнение 4-х операций (создание, чтение, редактирование, удаление) в секунду. Для выполнения тестов используется JMeter. 50 параллельных потоков будут выполнять по 200 запросов каждый, с нулевым интервалом между запросами:

Настройки JMeter
Настройки JMeter

Файл проекта JMeter также можно найти в репозитории.

Результаты работы JMeter для CRUD-приложения на базе файберов (порт 8085)
Результаты работы JMeter для CRUD-приложения на базе файберов (порт 8085)
Результаты работы JMeter для CRUD на базе PHP FPM (порт 8086)
Результаты работы JMeter для CRUD на базе PHP FPM (порт 8086)

Как видно из результатов: ~58 крудов в секунду против 16 крудов в секунду на одном ядре.

С помощью cAdvisor, Prometheus и Grafana был также измерен объём памяти, который потреблялся каждой из двух версий во время трёх последовательных запусков:

Потребление памяти. Зелёным показан CRUD на файберах, жёлтым - PHP FPM
Потребление памяти. Зелёным показан CRUD на файберах, жёлтым - PHP FPM

Таким образом, файберный сервер с пулом соединений к БД потребляет в два раза меньше памяти делая ту же работу в 3 раза быстрее. Всё это делалось на 1 ядре для чистоты эксперимента. Конечно, FPM начнёт выигрывать, если дать ему несколько ядер, однако и для файберного сервера можно запустить несколько процессов за балансировщиком, которые смогут использовать преимущества многоядерных окружений. Так что сравнение на 1 ядре вполне оправдано. Если представленные результаты показались вам интересными, далее предлагаю кратко рассмотреть код прототипа.

Описание кода

Чтобы удобно сделать CRUD, в приложение со времени предыдущих статей были добавлены следующие части:

Для краткости статьи, не буду на них останавливаться, код в репозитории довольно очевиден в части их использования и схож с примерами из документации.

Наибольшего внимания в представленном коде, на мой взгляд, заслуживает слой взаимодействия с БД, а именно файл src/Db/AsyncDb.php. Вот в чём соль:

private function sendQuery(
    FiberedHandler $fiberedHandler,
    Connection $connection,
    Sock $connectionSocket,
    string $query,
    array $params
): void {
    if (!pg_send_query_params($connection, $query, $params)) {
        throw new DbException('Error while sending the query.');
    }
    $start = time();
    $readyTo = null;
    while (true) {
        if ($readyTo === FiberedHandler::SOCKET_READY_TO_READ) {
            App::debugFiber($fiberedHandler, 'Consuming query result.');
            if (!pg_consume_input($connection)) {
                throw new DbException('Error while consuming the query result.');
            }
        }
        $flush = pg_flush($connection);
        App::debugFiber($fiberedHandler, 'Flushing resulted with: %d', $flush);
        if ($flush === false) {
            throw new DbException('Error while sending the query.');
        } elseif ($flush === true) {
            App::debugFiber($fiberedHandler, 'Flushing has been finished successfully.');
            break;
        }
        $fiberedHandler->addReadSocket($connectionSocket);
        $fiberedHandler->addWriteSocket($connectionSocket);
        $readyTo = $fiberedHandler->suspend();
        if ($fiberedHandler->isTimeoutReached($start)) {
            throw new DbException('Connection timed out while executing a database query.');
        }
    }
}

Вместо блокирующих вызовов драйвера PgSQL используются неблокирующие, а если результат ещё не готов, не ждём, а делаем другие, более полезные вещи при помощи файберов. Точнее, когда SQL-запрос отправляется на сервер БД с помощью функции pg_send_query_params() мы уже находимся в контексте файбера (для обработки каждого HTTP-запроса создаётся отдельный файбер) и просто делаем Fiber::suspend(), если отправка ещё не завершена. Приведённый фрагмент кода реализует только отправку запроса на сервер. Получение результата производится схожим способом с помощью функций pg_consume_input(), pg_connection_busy() и pg_get_result(). Эта последовательность рекомендована в документации к драйверу PostgreSQL libpq. Документация драйвера PosgreSQL для PHP не всегда достаточно подробна, но, к счастью, этот драйвер во многом повторяет интерфейс библиотеки libpq, которая снабжена гораздо более подробной документацией.

private function receiveQueryResultAndFetchAllRows(
    FiberedHandler $fiberedHandler,
    Connection $connection,
    Sock $connectionSocket
): array {
    $result = $this->receiveQueryResult($fiberedHandler, $connection, $connectionSocket);
    $rows = pg_fetch_all($result);
    if ($rows === false) {
        throw new DbException('Error while extracting the data from the query result.');
    }
    if (!pg_free_result($result)) {
        throw new DbException('Error while freeing the query result.');
    }

    return $rows;
}

private function receiveQueryResult(
    FiberedHandler $fiberedHandler,
    Connection $connection,
    Sock $connectionSocket
): Result {
    $start = time();
    while (true) {
        App::debugFiber($fiberedHandler, 'Consuming query result.');
        if (!pg_consume_input($connection)) {
            throw new DbException('Error while consuming the query result.');
        }
        if (!pg_connection_busy($connection)) {
            App::debugFiber($fiberedHandler, 'The query result is ready. Going to read it immediately.');
            break;
        }
        App::debugFiber($fiberedHandler, 'The connection is busy. Waiting for the data to read.');
        $fiberedHandler->addReadSocket($connectionSocket);
        $fiberedHandler->suspend();
        if ($fiberedHandler->isTimeoutReached($start)) {
            throw new DbException('Connection timed out while executing a database query.');
        }
    }
    $result = pg_get_result($connection);
    if ($result === false) {
        throw new DbException('Error while getting the query result.');
    }
    $errorMessage = pg_result_error($result);
    if (!empty($errorMessage)) {
        $sqlState = pg_result_error_field($result, PGSQL_DIAG_SQLSTATE);
        if (is_numeric($sqlState)) {
            $code = (int)$sqlState;
        } else {
            $code = 0;
        }
        throw new DbException(sprintf('Error while getting the query result: %s %s', $sqlState, $errorMessage), $code);
    }

    return $result;
}

Эти методы помимо всего прочего требуют передачи объекта соединения с БД. Так как данное приложение это один постоянно живущий процесс, было бы странно создавать и разрывать соединения с БД на каждый HTTP-запрос. По этой причине был реализован пул соединений. Соединение запрашивается из пула перед выполнением SQL-запроса и возвращается в пул после завершения:

public function query(FiberedHandler $fiberedHandler, string $query, array $params = []): array
{
    if (str_contains($query, ';')) {
        throw new InvalidArgumentException('Multiple queries are unsupported.');
    }
    try {
        $connection = $this->connectionPool->obtainConnection($fiberedHandler);
    } catch (DbException $e) {
        App::debugFiber($fiberedHandler, 'Error getting a database connection instance.');
        throw $e;
    }
    try {
        $connectionSocket = $this->extractSocket($connection);
        $this->sendQuery($fiberedHandler, $connection, $connectionSocket, $query, $params);
        $rows = $this->receiveQueryResultAndFetchAllRows($fiberedHandler, $connection, $connectionSocket);
        $this->connectionPool->freeConnection($connection); //DO NOT FORGET TO RETURN THE CONNECTION BACK TO THE POOL!!!
    } catch (DbException $e) {
        App::debugFiber($fiberedHandler, 'DbException occurred. Going to close the database connection: %s', $e->getMessage());
        $this->connectionPool->closeConnection($connection);
        throw $e;
    }

    return $rows;
}

Код самого пула соединений довольно прост, по сути это коллекция соединений с методами доступа:

<?php

class ConnectionPool
{
    /** @var Connection[] */
    private array $freeConnections = [];

    /** @var array<int, Connection>*/
    private array $obtainedConnections = [];

    /**
     * @throws DbException
     */
    public function obtainConnection(FiberedHandler $fiberedHandler): Connection
    {
        if (!empty($this->freeConnections)) {
            /** @var Connection $connection */
            $connection = array_shift($this->freeConnections);
            $connectionStatus = pg_connection_status($connection);
            App::debugFiber($fiberedHandler, 'A free connection found. Status: %d. Pid: %d', $connectionStatus, pg_get_pid($connection));
            if ($connectionStatus !== PGSQL_CONNECTION_OK) {
                pg_close($connection);
                App::debugFiber($fiberedHandler, 'Postgresql connection is not ok: %d', $connectionStatus);
                $connection = $this->initConnection($fiberedHandler);
            }
        } else {
            App::debugFiber($fiberedHandler, 'There is no free connection. Will try to init a new one.');
            $connection = $this->initConnection($fiberedHandler);
        }
        $this->obtainedConnections[spl_object_id($connection)] = $connection;

        return $connection;
    }

    public function freeConnection(Connection $connection): void
    {
        unset($this->obtainedConnections[spl_object_id($connection)]);
        $this->freeConnections[] = $connection;
    }

    public function closeConnection(Connection $connection): void
    {
        unset($this->obtainedConnections[spl_object_id($connection)]);
        pg_close($connection);
    }

     /**
     * @throws DbException
     */
    public function initConnection(FiberedHandler $fiberedHandler): Connection
    {
        App::debugFiber($fiberedHandler, 'Postgresql connection initialization started.');
        $start = time();
        $connection = pg_connect(getenv('PG_CONN_STR'), PGSQL_CONNECT_ASYNC | PGSQL_CONNECT_FORCE_NEW);
        if ($connection === false) {
            $lastError = pg_last_error();
            App::debugFiber($fiberedHandler, 'Postgresql connect error: %s', $lastError);
            throw new DbException(sprintf("Postgresql connect error: %s", $lastError));
        }
        while (true) {
            $connectionStatus = pg_connect_poll($connection);
            App::debugFiber($fiberedHandler, 'Postgresql connection status: %d', $connectionStatus);
            if (in_array($connectionStatus, [PGSQL_POLLING_READING, PGSQL_POLLING_WRITING])) {
                $pgSocket = Sock::fromStream(pg_socket($connection)); //let's see what will happen using this bunch of function calls
                if ($connectionStatus === PGSQL_POLLING_READING) {
                    $fiberedHandler->addReadSocket($pgSocket);
                } else {
                    $fiberedHandler->addWriteSocket($pgSocket);
                }
                App::debugFiber($fiberedHandler, 'Postgresql connection is not ready yet. Suspending fiber.');
                Fiber::suspend();
                if ($fiberedHandler->isTimeoutReached($start)) {
                    App::debugFiber($fiberedHandler, 'Connection timed out while connecting to Postgresql. Reason: %s', pg_last_error($connection));
                    throw new DbException('Connection timed out while connecting to Postgresql.');
                }
                continue;
            }
            if ($connectionStatus === PGSQL_POLLING_FAILED) {
                $lastError = pg_last_error($connection);
                App::debugFiber($fiberedHandler, 'Postgresql connect error: %s', $lastError);
                throw new DbException(sprintf("Postgresql connect error: %s", $lastError));
            } elseif ($connectionStatus === PGSQL_POLLING_OK) {
                App::debugFiber($fiberedHandler, 'Postgresql connection initialized. Connection pid: %s', pg_get_pid($connection));
                break;
            } else {
                App::debugFiber($fiberedHandler, 'Unknown Postgresql connection state: %s', $connectionStatus);
                throw new DbException('Unknown Postgresql connection state.');
            }
        }

        return $connection;
    }
}

Как и в случае с выполнением SQL-запросов, подключение к серверу осуществляется неблокирующим способом, с переключением на другие задачи в случае неготовности подключения. Готовность подключения проверяется с помощью вызова функции pg_connect_poll(), цикл и приостановка файбера также на месте. При запросе свободного подключения клиентским кодом, подключение-кандидат проверяется на валидное состояние с помощью pg_connection_status(), при необходимости создаётся новое подключение.

Далее имеет смысл рассмотреть код сервиса, который вызывается из контроллеров, и собственно, реализует CRUD. Прошу заметить, что я знаю, что SQL-запросам не место в сервисном слое, но в прототипе сделано именно так для краткости. Рассмотрим метод, реализующий добавление записи. Как можно видеть, его код не содержит никаких особенностей, намекающих не конкурентное выполнение или использование неблокирующих вызовов, кроме передачи $fiberedHandlerв нижележащий слой.

public function add(AircraftDto $dto, FiberedHandler $fiberedHandler): void
{
    try {
        $insertedRows = $this->db->execute(
            $fiberedHandler,
            'INSERT INTO aircraft(
                number, 
                title, 
                serial_number, 
                assembly_date, 
                added_at
            ) VALUES($1, $2, $3, $4, $5)',
            [
                $dto->number,
                $dto->title,
                $dto->serialNumber,
                $dto->assemblyDate->format('Y-m-d'),
                $dto->addedAt->format('Y-m-d')
            ]
        );
        if ($insertedRows !== 1) {
            throw new DomainException('Error while adding new aircraft.');
        }
    } catch (DbException $e) {
        if ($e->isDuplicateKey()) {
            throw new DuplicateEntityException('This aircraft has been already added.', $e->getCode(), $e);
        }
        throw new DomainException('Something went wrong.', $e->getCode(), $e);
    }
}

Этот код можно использовать и в неконкурентном приложении, передав заглушку в качестве значения $fiberedHandler, что и делается в PHP FPM части прототипа: сервис, котроллеры и код диспетчера HTTP-запросов на базе FastRoute используются как в файберном сервере, так и в сервере на PHP FPM чтобы сэкономить время и объем прототипа. Остальной код либо описан в предыдущих статьях, либо довольно лёгок для понимания, так что не стану на нём останавливаться.

Выводы

Говоря упрощённо, блокирующие вызовы драйверов БД или блокирующего(синхронного) ввода-вывода под капотом делают то же самое что и неблокирующие(асинхронные) вызовы, но с ожиданием результата. Такое ожидание для каждого блокирующего вызова происходит на стороне ядра, вызывающий процесс при этом вынужден ждать. В представленном примере блокирование выполняется для всех сокетов в приложении вместе с помощью libev (расширения ev) , что даёт возможность сделать свой цикл событий ввода-вывода, а вместо использования системного планировщика для переключения между множеством блокирующихся процессов управляемых FPM - организовать кооперативную многозадачность между файберами. Если не будет ввода-вывода ни на одном из сокетов, то приложение будет заблокировано и ничего не будет делать. Подход "один персональный блокируемый процесс или поток для обработки каждого отдельного HTTP-запроса" не слишком уж эффективен. Процессы и потоки лучше не оставлять простаивать в ожидании, а разработчикам лучше не заниматься выматывающим поиском багов, возникающих из-за проблем с thread-safety в случае использования потоков. Если нужно вычислить что-то объёмное (например, пережать изображение или даже сформировать объёмный HTML или XML, использовать Twig-шаблон), это всегда можно сделать через сервер заданий: отправить такую работу какому-либо вычисляющему воркеру и ожидать результата в неблокирующем режиме, занимаясь обработкой других запросов. Я планирую написать следующую статью об использовании сервера обработки заданий совместно с данным прототипом.

К сожалению, для MySQL реализовать подобное пока не представляется мне возможным. Драйвер mysqli хоть и содержит функции для асинхронного выполнения запросов, но не предоставляет при этом никакого аналога функции pg_socket(), который позволял бы получить доступ к системному сокету соединения с БД. Остаётся надеяться, что в будущем такая функция появится.

Спасибо за чтение и до новых встреч на страницах Хабра;)

Полезные ссылки

Теги:
Хабы:
+9
Комментарии4

Публикации

Работа

PHP программист
82 вакансии

Ближайшие события