В этой статье я подготовил для вас прототип CRUD-приложения, которое использует файберы и неблокирующие(асинхронные) возможности драйвера PostreSQL. Вместе они дают любопытные результаты по производительности и потреблению памяти.
Данная статься является продолжением статей:
С ними рекомендуется познакомиться (но совсем необязательно), чтобы лучше понять приведённый код прототипа. Если кратко, то описанный в предыдущих статьях прототип это HTTP-сервер на PHP, который самостоятельно слушает HTTP-порт, принимает HTTP-запросы и формирует ответы, делая это без помощи Nginx, Apache и PHP FPM. Достичь конкурентной обработки запросов ему помогает расширение ev (для отслеживания состояния сетевых сокетов, как более совершенная альтернатива функции socket_select()
) и файберы. В данной статье ко всему этому добавилось неблокирующее взаимодействие с сервером PostreSQL.
Код прототипа приведен в репозитории, в ветке aircraft. Приложение имеет два одинаковых CRUD API для сравнения. Оба частично собраны из одних и тех же классов при помощи контейнера внедрения зависимостей. Первый (порт 8085) реализован при помощи HTTP-сервера на базе файберов, в второй (порт 8086) работает на Nginx+PHP FPM. Оба пользуются таблицей aircraft
на одном и том же сервере PostgreSQL. Код в репозитории содержит окружение на базе Docker-compose для запуска прототипа.
Конкурентное выполнение запросов
Описываемый файберный HTTP-сервер (порт 8085) содержит только один процесс PHP CLI, как можно убедиться, посмотрев файл docker-compose.yml:
services:
php:
command: php -f /app/bin/server.php
Для того, чтобы убедиться в конкурентности, по адресу /sleep
добавлен контроллер следующего содержания:
class SleepController
{
private Db $db;
public function __construct(Db $db)
{
$this->db = $db;
}
public function __invoke(ServerRequestInterface $request, FiberedHandler $fiberedHandler): ResponseInterface
{
$affectedRows = $this->db->execute($fiberedHandler, 'SELECT pg_sleep(1)');
$rows = $this->db->query($fiberedHandler, 'SELECT gen_random_uuid() AS uuid');
if (count($rows) !== 1 || !isset($rows[0]['uuid'])) {
throw new LogicException('Something went wrong.');
}
$uuid = $rows[0]['uuid'];
return new TextResponse("Hello from Sleep request handler! Affected rows: $affectedRows. Generated uuid: $uuid");
}
}
Если обработка запросов осуществлялась бы в последовательном режиме, то общее время ожидания ответа на, скажем, 10 параллельных HTTP-запросов было бы не менее 10 секунд, но представленный одиночный PHP CLI процесс справляется с этим за 2+ секунды. Это легко проверить, воспользовавшись утилитой ab:
$ ab -c 10 -n 10 http://localhost:8085/sleep
Concurrency Level: 10
Time taken for tests: 2.029 seconds
Complete requests: 10
Failed requests: 0
Total transferred: 1640 bytes
HTML transferred: 1040 bytes
Requests per second: 4.93 [#/sec] (mean)
Time per request: 2028.604 [ms] (mean)
Time per request: 202.860 [ms] (mean, across all concurrent requests)
Transfer rate: 0.79 [Kbytes/sec] received
Это свидетельствует о том, что весомая часть этих запросов обрабатывается параллельно. Учитывая, что интерпретатор PHP из коробки не поддерживает параллельное выполнение кода, логично сделать вывод, что параллельность pg_sleep(1)
здесь обеспечивается сервером PostgreSQL. Однако, сам по себе Postgres представленного результата не обеспечит. Остальную работу делает один постоянно функционирующий процесс PHP, используя файберы, цикл и неблокирующий ввод-вывод.
Сравнение файберного HTTP-сервера и сервера на базе PHP FPM на одном ядре
Каждый из них предоставляет абсолютно одинаковый набор CRUD-операций. В качестве метрики сравнения решено было выбрать время, затраченное на выполнение 4-х операций (создание, чтение, редактирование, удаление) в секунду. Для выполнения тестов используется JMeter. 50 параллельных потоков будут выполнять по 200 запросов каждый, с нулевым интервалом между запросами:

Файл проекта JMeter также можно найти в репозитории.


Как видно из результатов: ~58 крудов в секунду против 16 крудов в секунду на одном ядре.
С помощью cAdvisor, Prometheus и Grafana был также измерен объём памяти, который потреблялся каждой из двух версий во время трёх последовательных запусков:

Таким образом, файберный сервер с пулом соединений к БД потребляет в два раза меньше памяти делая ту же работу в 3 раза быстрее. Всё это делалось на 1 ядре для чистоты эксперимента. Конечно, FPM начнёт выигрывать, если дать ему несколько ядер, однако и для файберного сервера можно запустить несколько процессов за балансировщиком, которые смогут использовать преимущества многоядерных окружений. Так что сравнение на 1 ядре вполне оправдано. Если представленные результаты показались вам интересными, далее предлагаю кратко рассмотреть код прототипа.
Описание кода
Чтобы удобно сделать CRUD, в приложение со времени предыдущих статей были добавлены следующие части:
роутинг на базе FastRoute;
laminas/laminas-httphandlerrunner для выдачи PSR7 Response в приложении на базе PHP FPM.
симфонийский валидатор для валидации содержимого CRUD-запросов;
Для краткости статьи, не буду на них останавливаться, код в репозитории довольно очевиден в части их использования и схож с примерами из документации.
Наибольшего внимания в представленном коде, на мой взгляд, заслуживает слой взаимодействия с БД, а именно файл src/Db/AsyncDb.php. Вот в чём соль:
private function sendQuery(
FiberedHandler $fiberedHandler,
Connection $connection,
Sock $connectionSocket,
string $query,
array $params
): void {
if (!pg_send_query_params($connection, $query, $params)) {
throw new DbException('Error while sending the query.');
}
$start = time();
$readyTo = null;
while (true) {
if ($readyTo === FiberedHandler::SOCKET_READY_TO_READ) {
App::debugFiber($fiberedHandler, 'Consuming query result.');
if (!pg_consume_input($connection)) {
throw new DbException('Error while consuming the query result.');
}
}
$flush = pg_flush($connection);
App::debugFiber($fiberedHandler, 'Flushing resulted with: %d', $flush);
if ($flush === false) {
throw new DbException('Error while sending the query.');
} elseif ($flush === true) {
App::debugFiber($fiberedHandler, 'Flushing has been finished successfully.');
break;
}
$fiberedHandler->addReadSocket($connectionSocket);
$fiberedHandler->addWriteSocket($connectionSocket);
$readyTo = $fiberedHandler->suspend();
if ($fiberedHandler->isTimeoutReached($start)) {
throw new DbException('Connection timed out while executing a database query.');
}
}
}
Вместо блокирующих вызовов драйвера PgSQL используются неблокирующие, а если результат ещё не готов, не ждём, а делаем другие, более полезные вещи при помощи файберов. Точнее, когда SQL-запрос отправляется на сервер БД с помощью функции pg_send_query_params()
мы уже находимся в контексте файбера (для обработки каждого HTTP-запроса создаётся отдельный файбер) и просто делаем Fiber::suspend()
, если отправка ещё не завершена. Приведённый фрагмент кода реализует только отправку запроса на сервер. Получение результата производится схожим способом с помощью функций pg_consume_input()
, pg_connection_busy()
и pg_get_result()
. Эта последовательность рекомендована в документации к драйверу PostgreSQL libpq. Документация драйвера PosgreSQL для PHP не всегда достаточно подробна, но, к счастью, этот драйвер во многом повторяет интерфейс библиотеки libpq, которая снабжена гораздо более подробной документацией.
private function receiveQueryResultAndFetchAllRows(
FiberedHandler $fiberedHandler,
Connection $connection,
Sock $connectionSocket
): array {
$result = $this->receiveQueryResult($fiberedHandler, $connection, $connectionSocket);
$rows = pg_fetch_all($result);
if ($rows === false) {
throw new DbException('Error while extracting the data from the query result.');
}
if (!pg_free_result($result)) {
throw new DbException('Error while freeing the query result.');
}
return $rows;
}
private function receiveQueryResult(
FiberedHandler $fiberedHandler,
Connection $connection,
Sock $connectionSocket
): Result {
$start = time();
while (true) {
App::debugFiber($fiberedHandler, 'Consuming query result.');
if (!pg_consume_input($connection)) {
throw new DbException('Error while consuming the query result.');
}
if (!pg_connection_busy($connection)) {
App::debugFiber($fiberedHandler, 'The query result is ready. Going to read it immediately.');
break;
}
App::debugFiber($fiberedHandler, 'The connection is busy. Waiting for the data to read.');
$fiberedHandler->addReadSocket($connectionSocket);
$fiberedHandler->suspend();
if ($fiberedHandler->isTimeoutReached($start)) {
throw new DbException('Connection timed out while executing a database query.');
}
}
$result = pg_get_result($connection);
if ($result === false) {
throw new DbException('Error while getting the query result.');
}
$errorMessage = pg_result_error($result);
if (!empty($errorMessage)) {
$sqlState = pg_result_error_field($result, PGSQL_DIAG_SQLSTATE);
if (is_numeric($sqlState)) {
$code = (int)$sqlState;
} else {
$code = 0;
}
throw new DbException(sprintf('Error while getting the query result: %s %s', $sqlState, $errorMessage), $code);
}
return $result;
}
Эти методы помимо всего прочего требуют передачи объекта соединения с БД. Так как данное приложение это один постоянно живущий процесс, было бы странно создавать и разрывать соединения с БД на каждый HTTP-запрос. По этой причине был реализован пул соединений. Соединение запрашивается из пула перед выполнением SQL-запроса и возвращается в пул после завершения:
public function query(FiberedHandler $fiberedHandler, string $query, array $params = []): array
{
if (str_contains($query, ';')) {
throw new InvalidArgumentException('Multiple queries are unsupported.');
}
try {
$connection = $this->connectionPool->obtainConnection($fiberedHandler);
} catch (DbException $e) {
App::debugFiber($fiberedHandler, 'Error getting a database connection instance.');
throw $e;
}
try {
$connectionSocket = $this->extractSocket($connection);
$this->sendQuery($fiberedHandler, $connection, $connectionSocket, $query, $params);
$rows = $this->receiveQueryResultAndFetchAllRows($fiberedHandler, $connection, $connectionSocket);
$this->connectionPool->freeConnection($connection); //DO NOT FORGET TO RETURN THE CONNECTION BACK TO THE POOL!!!
} catch (DbException $e) {
App::debugFiber($fiberedHandler, 'DbException occurred. Going to close the database connection: %s', $e->getMessage());
$this->connectionPool->closeConnection($connection);
throw $e;
}
return $rows;
}
Код самого пула соединений довольно прост, по сути это коллекция соединений с методами доступа:
<?php
class ConnectionPool
{
/** @var Connection[] */
private array $freeConnections = [];
/** @var array<int, Connection>*/
private array $obtainedConnections = [];
/**
* @throws DbException
*/
public function obtainConnection(FiberedHandler $fiberedHandler): Connection
{
if (!empty($this->freeConnections)) {
/** @var Connection $connection */
$connection = array_shift($this->freeConnections);
$connectionStatus = pg_connection_status($connection);
App::debugFiber($fiberedHandler, 'A free connection found. Status: %d. Pid: %d', $connectionStatus, pg_get_pid($connection));
if ($connectionStatus !== PGSQL_CONNECTION_OK) {
pg_close($connection);
App::debugFiber($fiberedHandler, 'Postgresql connection is not ok: %d', $connectionStatus);
$connection = $this->initConnection($fiberedHandler);
}
} else {
App::debugFiber($fiberedHandler, 'There is no free connection. Will try to init a new one.');
$connection = $this->initConnection($fiberedHandler);
}
$this->obtainedConnections[spl_object_id($connection)] = $connection;
return $connection;
}
public function freeConnection(Connection $connection): void
{
unset($this->obtainedConnections[spl_object_id($connection)]);
$this->freeConnections[] = $connection;
}
public function closeConnection(Connection $connection): void
{
unset($this->obtainedConnections[spl_object_id($connection)]);
pg_close($connection);
}
/**
* @throws DbException
*/
public function initConnection(FiberedHandler $fiberedHandler): Connection
{
App::debugFiber($fiberedHandler, 'Postgresql connection initialization started.');
$start = time();
$connection = pg_connect(getenv('PG_CONN_STR'), PGSQL_CONNECT_ASYNC | PGSQL_CONNECT_FORCE_NEW);
if ($connection === false) {
$lastError = pg_last_error();
App::debugFiber($fiberedHandler, 'Postgresql connect error: %s', $lastError);
throw new DbException(sprintf("Postgresql connect error: %s", $lastError));
}
while (true) {
$connectionStatus = pg_connect_poll($connection);
App::debugFiber($fiberedHandler, 'Postgresql connection status: %d', $connectionStatus);
if (in_array($connectionStatus, [PGSQL_POLLING_READING, PGSQL_POLLING_WRITING])) {
$pgSocket = Sock::fromStream(pg_socket($connection)); //let's see what will happen using this bunch of function calls
if ($connectionStatus === PGSQL_POLLING_READING) {
$fiberedHandler->addReadSocket($pgSocket);
} else {
$fiberedHandler->addWriteSocket($pgSocket);
}
App::debugFiber($fiberedHandler, 'Postgresql connection is not ready yet. Suspending fiber.');
Fiber::suspend();
if ($fiberedHandler->isTimeoutReached($start)) {
App::debugFiber($fiberedHandler, 'Connection timed out while connecting to Postgresql. Reason: %s', pg_last_error($connection));
throw new DbException('Connection timed out while connecting to Postgresql.');
}
continue;
}
if ($connectionStatus === PGSQL_POLLING_FAILED) {
$lastError = pg_last_error($connection);
App::debugFiber($fiberedHandler, 'Postgresql connect error: %s', $lastError);
throw new DbException(sprintf("Postgresql connect error: %s", $lastError));
} elseif ($connectionStatus === PGSQL_POLLING_OK) {
App::debugFiber($fiberedHandler, 'Postgresql connection initialized. Connection pid: %s', pg_get_pid($connection));
break;
} else {
App::debugFiber($fiberedHandler, 'Unknown Postgresql connection state: %s', $connectionStatus);
throw new DbException('Unknown Postgresql connection state.');
}
}
return $connection;
}
}
Как и в случае с выполнением SQL-запросов, подключение к серверу осуществляется неблокирующим способом, с переключением на другие задачи в случае неготовности подключения. Готовность подключения проверяется с помощью вызова функции pg_connect_poll()
, цикл и приостановка файбера также на месте. При запросе свободного подключения клиентским кодом, подключение-кандидат проверяется на валидное состояние с помощью pg_connection_status()
, при необходимости создаётся новое подключение.
Далее имеет смысл рассмотреть код сервиса, который вызывается из контроллеров, и собственно, реализует CRUD. Прошу заметить, что я знаю, что SQL-запросам не место в сервисном слое, но в прототипе сделано именно так для краткости. Рассмотрим метод, реализующий добавление записи. Как можно видеть, его код не содержит никаких особенностей, намекающих не конкурентное выполнение или использование неблокирующих вызовов, кроме передачи $fiberedHandler
в нижележащий слой.
public function add(AircraftDto $dto, FiberedHandler $fiberedHandler): void
{
try {
$insertedRows = $this->db->execute(
$fiberedHandler,
'INSERT INTO aircraft(
number,
title,
serial_number,
assembly_date,
added_at
) VALUES($1, $2, $3, $4, $5)',
[
$dto->number,
$dto->title,
$dto->serialNumber,
$dto->assemblyDate->format('Y-m-d'),
$dto->addedAt->format('Y-m-d')
]
);
if ($insertedRows !== 1) {
throw new DomainException('Error while adding new aircraft.');
}
} catch (DbException $e) {
if ($e->isDuplicateKey()) {
throw new DuplicateEntityException('This aircraft has been already added.', $e->getCode(), $e);
}
throw new DomainException('Something went wrong.', $e->getCode(), $e);
}
}
Этот код можно использовать и в неконкурентном приложении, передав заглушку в качестве значения $fiberedHandler
, что и делается в PHP FPM части прототипа: сервис, котроллеры и код диспетчера HTTP-запросов на базе FastRoute используются как в файберном сервере, так и в сервере на PHP FPM чтобы сэкономить время и объем прототипа. Остальной код либо описан в предыдущих статьях, либо довольно лёгок для понимания, так что не стану на нём останавливаться.
Выводы
Говоря упрощённо, блокирующие вызовы драйверов БД или блокирующего(синхронного) ввода-вывода под капотом делают то же самое что и неблокирующие(асинхронные) вызовы, но с ожиданием результата. Такое ожидание для каждого блокирующего вызова происходит на стороне ядра, вызывающий процесс при этом вынужден ждать. В представленном примере блокирование выполняется для всех сокетов в приложении вместе с помощью libev (расширения ev) , что даёт возможность сделать свой цикл событий ввода-вывода, а вместо использования системного планировщика для переключения между множеством блокирующихся процессов управляемых FPM - организовать кооперативную многозадачность между файберами. Если не будет ввода-вывода ни на одном из сокетов, то приложение будет заблокировано и ничего не будет делать. Подход "один персональный блокируемый процесс или поток для обработки каждого отдельного HTTP-запроса" не слишком уж эффективен. Процессы и потоки лучше не оставлять простаивать в ожидании, а разработчикам лучше не заниматься выматывающим поиском багов, возникающих из-за проблем с thread-safety в случае использования потоков. Если нужно вычислить что-то объёмное (например, пережать изображение или даже сформировать объёмный HTML или XML, использовать Twig-шаблон), это всегда можно сделать через сервер заданий: отправить такую работу какому-либо вычисляющему воркеру и ожидать результата в неблокирующем режиме, занимаясь обработкой других запросов. Я планирую написать следующую статью об использовании сервера обработки заданий совместно с данным прототипом.
К сожалению, для MySQL реализовать подобное пока не представляется мне возможным. Драйвер mysqli хоть и содержит функции для асинхронного выполнения запросов, но не предоставляет при этом никакого аналога функции pg_socket()
, который позволял бы получить доступ к системному сокету соединения с БД. Остаётся надеяться, что в будущем такая функция появится.
Спасибо за чтение и до новых встреч на страницах Хабра;)