Базы данных не являются очередями сообщений — это устоявшееся утверждение, которое обсуждалось во многих статьях в блогах и презентациях на конференциях. Но с развитием реляционных баз данных выдерживает ли это утверждение проверку? Если посмотреть на современные версии Postgres, то ответ часто оказывается отрицательным.
Поэтому в этой статье рассматривается упрощенный механизм уведомлений Postgres и обсуждается, как его можно использовать для реализации простой, но эффективной очереди сообщений на основе push-уведомлений. В ней также рассматривается использование этой очереди для взаимодействия между репликами в развертывании Kubernetes и реализация библиотеки для универсальной обработки задач.
Postgres как очередь сообщений
Postgres — это, конечно, реляционная база данных, которая реализует большую часть стандарта SQL.
Но, помимо этого, в Postgres реализовано множество других нестандартных функций, которые также могут выполняться с помощью расширения SQL.
Одной из таких возможностей является механизм LISTEN и NOTIFY, который позволяет отправлять асинхронные сообщения через соединения с базой данных. И, конечно же, эти команды могут быть выполнены через JDBC. В качестве простого примера hello-world рассмотрим JVM для прослушивания заданного hello_world_channel
:
try (Connection conn = getConnection()) {
try (Statement stmt = conn.createStatement()) {
stmt.execute("LISTEN hello_world_channel");
}
PGNotification[] notifications = conn
.unwrap(PgConnection.class)
.getNotifications(0);
System.out.println(
"Hello " + notifications[0].getParameter() + "!");
}
Для получения уведомлений необходимо указать имя канала, который нужно прослушивать. Имя канала может быть выбрано произвольно. Для получения уведомлений необходимо развернуть соединение с драйвером JDBC Postgres PgConnection
. Оттуда полученные уведомления можно прочитать с указанным тайм-аутом или 0, если вы хотите ждать бесконечно. Вторая JVM теперь может отправлять уведомления, используя аналогичную простую настройку:
try (
Connection conn = getConnection();
Statement stmt = conn.createStatement()) {
stmt.execute("NOTIFY hello_world_channel, 'World'");
}
Что заставит первую JVM напечатать "Hello World!".
Определение триггеров для создания простой очереди сообщений
Часто уведомление отправляется не напрямую, а через триггер в таблице. Например, чтобы реализовать упомянутую очередь сообщений, можно было бы начать создав следующую простую таблицу:
CREATE TABLE MY_MESSAGES (
RECEIVER VARCHAR(200),
ID SERIAL,
PAYLOAD JSON,
PROCESSED BOOLEAN
);
Чтобы отправлять уведомление всякий раз, когда сообщение вставляется в таблицу, функция, подобная следующей, реализует это на процедурном языке Postgres pgSQL, не изменяя вставленную строку:
CREATE FUNCTION MY_MESSAGES_FCT()
RETURNS TRIGGER AS
$BODY$
BEGIN
PERFORM pg_notify('my_message_queue', NEW.RECEIVER);
RETURN NEW;
END;
$BODY$
LANGUAGE PLPGSQL;
Приведенная выше функция вызывает функцию pg_notify, запускающую NOTIFY со вторым аргументом в качестве полезной нагрузки, но позволяет избежать возможной SQL-инъекции, которая может произойти при конкатенации строк.
Теперь эту функцию можно установить как триггер на любые вставки в MY_MESSAGES :
CREATE TRIGGER MY_MESSAGES_TRG
AFTER INSERT ON MY_MESSAGES
FOR EACH ROW
EXECUTE PROCEDURE MY_MESSAGES_FCT();
Таким образом, один или несколько слушателей (listener) могут быть уведомлены о поступлении новых сообщений, например, в виде реплик в рамках развертывания Kubernetes.
Уведомления Postgres и пул соединений
Одним из недостатков механизма уведомлений Postgres является то, что он обычно требует создания выделенного подключения для получения уведомлений.
Это связано с тем, что соединение используется для отправки уведомлений обратно по каналу, установленному клиентом JDBC при открытии соединения и выполнении оператора LISTEN
. Для этого требуется, чтобы соединение было долгоживущим, что обычно не очень хорошо сочетается с источниками данных в пуле. Вместо этого следует создать выделенное соединение через DriverManager API.
Обратите внимание, что это также занимает полное соединение на сервере Postgres, где соединения обычно также объединяются в пул. По этой причине сервер Postgres может начать отклонять новые попытки подключения, если слишком много JVM уже занимают выделенное соединение для прослушивания уведомлений. Поэтому может возникнуть необходимость увеличить максимальное количество разрешенных одновременных подключений к экземпляру сервера Postgres. Поскольку соединения для получения уведомлений часто простаивают и требуют мало машинных ресурсов, это обычно не является существенным изменением. Напротив, если прослушивание уведомлений может заменить частые опросы базы данных, такой подход может даже высвободить ресурсы.
При этих минусах подход Postgres имеет и менее очевидные плюсы. Например, в Oracle базе данных не требуется выделенное соединение. Однако это требует, чтобы база данных могла активно вызывать уведомленное приложение на заданном хосте и порте. Это не всегда возможно, например, в Kubernetes, когда несколько реплик используют общий хост.
Использование очереди сообщений JDBC от Spring Integration в Postgres
Эта функциональность будет доступна в Spring integration со скорым появлением шестой версии.
Spring Integration уже предлагает реализацию очереди на основе JDBC. Но на сегодняшний день она предлагает только опрос сообщений или получение push-сообщений при работе с одним и тем же объектом очереди в рамках одной JVM. Определив триггер, аналогичный приведенному выше, как предложено в файле schema-postgres.sql, Spring Integration позволяет получать сообщения, отправленные через обычный JdbcChannelMessageStore
.
Это позволяет отправить сообщение с любой сериализуемой полезной нагрузкой в заданный канал следующим образом:
JdbcChannelMessageStore messageStore =
new JdbcChannelMessageStore(getDataSource());
messageStore.setChannelMessageStoreQueryProvider(
new PostgresChannelMessageStoreQueryProvider());
messageStore.addMessageToGroup(
"some-channel",
new GenericMessage<>("World");
которое Spring Integration 6 теперь позволяет получать через push-уведомление от любой другой подключенной JVM с помощью следующего кода:
PostgresChannelMessageTableSubscriber subscriber =
new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(
getJdbcUrl(),
getUsername(),
getPassword()).unwrap(PgConnection.class);
subscriber.start()
PostgresSubscribableChannel channel =
new PostgresSubscribableChannel(
messageStore,
"some-channel",
subscriber);
channel.subscribe(message -> System.out.println(
"Hello " + message.getPayload() + "!");
Раньше подобное взаимодействие между JVM было возможно только путем опроса канала на наличие новых сообщений, в то время как описанный выше механизм позволяет осуществлять почти мгновенную связь между различными виртуальными машинами. При создании приложения с многими узлами, которое уже использует Postgres, это можно использовать как простой способ связи между виртуальными машинами.
Например, в Spring Integration 6 можно использовать LockRegistryLeaderInitiator
для определения узла, выполняющего неразделенную работу. Если несколько узлов могут получить HTTP-сообщение, предназначенное для обработки этим узлом-лидером, эти узлы теперь могут перенаправить этот вызов через хранилище сообщений JDBC, которое мгновенно уведомит лидера. Этого можно достичь, написав всего несколько строк кода и без необходимости расширения технического стека за счет дополнительных технологий, таких как Zookeeper.
Внедрение универсального обработчика задач с PUSH-уведомлениями для работников
В качестве реального примера обмена данными между JVM с использованием Postgres налоговая служба Норвегии предлагает минимальную библиотеку для универсальной обработки задач с использованием API уведомлений базы данных. Если создается пакет новых задач, несколько рабочих узлов уведомляются о дополнительной работе и просыпаются для опроса новых сообщений. Эта работа будет продолжаться до тех пор, пока не появятся дополнительные задачи, после чего работники (workers) снова заснут.
Это демонстрирует еще одну сильную сторону механизма уведомлений, который позволяет одновременно уведомлять любое количество слушателей данного канала и без предварительного распределения строк таблицы для данного узла. Благодаря многоверсионному управлению параллелизмом Postgres решение об этом распределении может быть принято путем выбора из базы данных, где каждый узел может получать блокировки строк для определения своих задач из таблицы, без необходимости отдельной реализации распределения в рамках возможного альтернативного механизма уведомлений. Все это делает Postgres хорошим выбором для использования базы данных в качестве очереди, особенно если Postgres уже является частью вашего технологического стека.