PubSub почти бесплатно: особенности NOTIFY в PostgreSQL

  • Tutorial
Если ваши микросервисы уже используют общую базу PostgreSQL для хранения данных, или ей пользуются несколько экземпляров одного сервиса на разных серверах, можно относительно «дешево» получить возможность обмена сообщениями (PubSub) между ними без интеграции в архитектуру Redis, RabbitMQ-кластера или встройки в код приложения другой MQ-системы.

Для этого мы не будем писать сообщения в таблицы БД, поскольку это вызывает слишком большие накладные расходы сначала на запись передаваемого, а потом еще и на зачистку от уже прочитанного.

Передавать и получать данные мы станем с помощью механизма NOTIFY/LISTEN, а модельную реализацию соберем для Node.js.



Но на этом пути лежат грабли, которые придется аккуратно обойти.

Особенности протокола


LISTEN


LISTEN канал
Приложение, использующее библиотеку libpq, выполняет команду LISTEN как обычную команду SQL, а затем оно должно периодически вызывать функцию PQnotifies, чтобы проверить, не поступили ли новые уведомления.
Если вы пишете не библиотеку для работы с PG, а уже конкретное приложение, в большинстве случаев, вы не будете иметь доступа к вызову этой функции.

Но если такую библиотеку уже написали для вас в соответствии с рекомендациями по обработке асинхронных запросов и уведомлений, вы автоматически получите сообщение в прикладном коде. Если нет — можно просто периодически выполнять SELECT 1 на соединении, тогда уведомление придет вместе с результатом запроса:
В очень старых выпусках libpq обеспечить своевременное получения сообщений от команды NOTIFY можно было только одним способом — постоянно отправлять команды, пусть даже пустые, а затем проверять PQnotifies после каждого вызова PQexec. Хотя этот метод всё ещё работает, он считается устаревшим ввиду неэффективного использования процессора.
С точки зрения, например, psql это выглядит вот так:

_tmp=# LISTEN test;
LISTEN
_tmp=# SELECT 1;
 ?column?
----------
        1
(1 row)

Asynchronous notification "test" with payload "abc123" received from server process with PID 63991.

Если для прикладной задачи мы можем согласиться на максимальную задержку доставки сообщения в пределах 1 сек — с таким интервалом и выполняем запрос. Заодно, этот способ помогает мониторить «живость» соединения, убеждаясь, что никто случайно не оборвал его со стороны сервера через pg_terminate_backend, или не произошел вообще внезапный «крэш» PG без всяких уведомлений клиентов.

NOTIFY


NOTIFY канал [ , сообщение ]

Команда NOTIFY отправляет событие уведомления вместе с дополнительной строкой «сообщения» всем клиентским приложениям, которые до этого выполнили в текущей базе данных LISTEN канал с указанным именем канала.

Строка «сообщения», которая будет передана вместе с уведомлением,… должна задаваться простой текстовой константой. В стандартной конфигурации её длина должна быть меньше 8000 байт.
То есть если наше «сообщение» внезапно содержит что-то сильно отличное от ASCII, то нам его придется экранировать, а если превысит размер в 8000 байт (не символов!) — то резать на блоки и потом склеивать. При этом нам стоит поберечь как пропускную способность канала, так и ресурсы сервера на обработку передачи таких блоков — то есть добавить к полезному контенту как можно меньше служебной «обвязки», но при этом и не «задушить» клиентское приложение, заставляя его заниматься упаковкой с gzip -9.

Из дополнительных плюсов механизма можно так же отметить привязку к «источнику» сообщения…
… дополнительной работы можно избежать, если проверить, не совпадает ли PID сигнализирующего процесса (указанный в данных события) с собственным PID сеанса (его можно узнать, обратившись к libpq). Если они совпадают, значит сеанс получил уведомление о собственных действиях, так что его можно игнорировать.
… и гарантированность порядка доставки:
Не считая фильтрации последующих экземпляров дублирующихся уведомлений, NOTIFY гарантирует, что уведомления от одной транзакции всегда поступают в том же порядке, в каком были отправлены. Также гарантируется, что сообщения от разных транзакций поступают в порядке фиксации этих транзакций.
Мы не станем ничего специально объединять, поэтому каждый наш запрос как раз будет соответствовать отдельной транзакции.

Но помните, что если на используемом для обмена соединении есть еще и прикладная активность, наш NOTIFY может оказаться внутри транзакции не по своей воле, поэтому могут возникнуть сайд-эффекты:
Транзакции оказывают значительное влияние на работу NOTIFY. Во-первых, если NOTIFY выполняется внутри транзакции, уведомления доставляются получателям после фиксирования транзакции и только в этом случае. Это разумно, так как в случае прерывания транзакции действие всех команд в ней аннулируется, включая NOTIFY.
Поэтому лучше использовать соединение, где заведомо не будет транзакций и длинных запросов.

AccessExclusiveLock on object 0 of class 1262 of database 0


Если внезапно ваши NOTIFY начали подтупливать и выдавать в лог ожидание такой блокировки, значит, вы все-таки уже «выросли из коротких штанишек», и пора задуматься о «взрослой» MQ.

Ведь очередь уведомлений хоть и достаточно велика (8GB в стандартных сборках), но все-таки конечна. Согласно ответу Tom Lane:
This lock is held while inserting the transaction's notify message(s), after which the transaction commits and releases the lock.
То есть вариантов обхода останется не слишком много:

  • отправлять, но реже
    То есть агрегировать отправляемые показатели, если это какие-то счетчики, на более длинном интервале.
  • отправлять меньший объем
    Например, удалять из передаваемых JSON «дефолтные» с точки зрения приложения значения ключей.
  • отправлять только сигнал, вообще без контента
    Как вариант — завести несколько каналов, имя каждого уже будет нести само по себе какой-то прикладной смысл.
  • все-таки вынести отправку из БД

Передача «сложных» сообщений


Кодирование «тела»


В общем случае, мы можем захотеть передавать в сообщении не только разрешенные символы, но и русские буквы, и «всякую бинарщину» — поэтому было бы удобно использовать конвертацию в hex-представление для формирования передаваемой строки. И да, такой способ вполне работает:

NOTIFY test, E'\x20\x21'

Asynchronous notification "test" with payload " !" received from server process with PID 63991.

Но обратимся снова к документации:
Вы должны позаботиться, чтобы байтовые последовательности, которые вы создаёте таким образом, особенно в восьмеричной и шестнадцатеричной записи, образовывали допустимые символы в серверной кодировке. Когда сервер работает с кодировкой UTF-8, вместо такой записи байт следует использовать спецпоследовательности Unicode или альтернативный синтаксис Unicode, описанный в Подразделе 4.1.2.3. (В противном случае придётся кодировать символы UTF-8 вручную и выписывать их по байтам, что очень неудобно.)
Поэтому даже с банальным символом кавычки-лапки из win1251 мы хлебнем горя:

NOTIFY test, E'\x98'
-- ERROR:  invalid byte sequence for encoding "UTF8": 0x98

Поскольку "кодировать символы UTF-8 вручную и выписывать их по байтам" мы не хотим, сразу договоримся передавать тело сообщения упакованным в base64 при наличии в нем любых символов за пределами диапазона \x20-\x7E или при необходимости разбиения на сегменты. С одной стороны, такой метод упаковки не слишком сильно увеличивает избыточность (коэффициент 4:3), с другой — в любом языке реализован на уровне системных библиотек, и обеспечит минимальную дополнительную нагрузку.

Но даже если у нас нет «странных» символов, и сообщение умещается в один сегмент, все равно остается одна особенность — экранирование апострофа:
Чтобы включить апостроф в строку, напишите в ней два апострофа рядом, например: 'Жанна д''Арк'. Заметьте, это не то же самое, что двойная кавычка (").

Идентификация сегментов


Следующая задача — корректно «порезать» сообщение на разрешенные к передаче блоки по 7999 байт, если его размер вдруг превысил это значение. Причем так, чтобы на получателе можно было собрать его без нарушения порядка или попадания в цепочку «чужих» сегментов. Для этого каждый из них надо как-то проидентифицировать.

Собственно, две «координаты» нам уже известны — это PID процесса-отправителя и имя канала, приходящие в каждом уведомлении. А порядок поступления сегментов нам гарантирует сам протокол.

Соседи-писатели
Мы не будем рассматривать случай, когда на одном соединении с БД (то есть заведомо в рамках одного прикладного процесса) активны одновременно несколько писателей в адрес одного и того же канала. Технически, это можно поддержать передачей дополнительного идентификатора в заголовке сегмента — но лучше «расшарить» единственный PubSub-объект внутри своего приложения.

Ограничение контейнера


Чтобы собрать цельный контейнер из нескольких сегментов, нам надо знать момент его окончания. Для этого есть два типовых способа:

  • передача целевого размера (в байтах или сегментах) в первом из них
  • передача признака [не]последнего сегмента в каждом из них

Поскольку мы пишем все-таки PubSub, большинство сообщений у нас будут короткими и резервировать много байт под передачу размера невыгодно. Поэтому воспользуемся вторым способом, зарезервировав первый символ данных сегмента в качестве флага продолжения/окончания контейнера.

Передача объектов


Чтобы передавать в качестве «сообщения» как обычные текстовые строки, так и JSON-объекты, добавим еще один символ-признак для обратного преобразования на стороне получателя.

Поскольку мы приняли решение кодировать «неформат» в base64, для флагов можно взять любые разрешенные символы, не входящие в этот набор.

Итого, у нас получились следующие варианты передаваемых сегментов:

-- "короткая" текстовая строка
!simple string
-- "короткий и простой" объект
@{"a":1}
-- нефинальный сегмент в base64
#<segment>
-- завершающий сегмент в base64
$<segment>

Как видно, достаточно проанализировать при получении сегмента всего лишь первый символ, чтобы понять, что нужно с ним дальше делать.

Пишем реализацию PubSub


Наше приложение будет на Node.js, поэтому для работы с PostgreSQL воспользуемся модулем node-postgres.

Пишем стартовый каркас
Для начала создадим PubSub как наследника EventEmitter, чтобы иметь возможность генерировать события в адрес тех, кто подписался на конкретные каналы:

const util = require('util');
const EventEmitter = require('events').EventEmitter;

const PubSub = function(connection, interval, skipSelf) {
  // используем уже существующее соединение
  this.connection = connection;

  // подписываемся своей обработкой на получение всех уведомлений
  this.connection.on('notification', p._onmessage.bind(this));

  // не принимать уведомления от собственного соединения с БД
  this.skipSelf = skipSelf;

  // запускаем "антифриз"
  setInterval(() => {
    this.connection.query('SELECT 1');
  }, interval);

  // тут будем хранить сегменты "недополученных" сообщений
  this.slices = {};
};

util.inherits(PubSub, EventEmitter);

const p = PubSub.prototype;

Работаем с каналами
Поскольку LISTEN/UNLISTEN никак не ругаются при повторной подписке на канал или отписке от того, на что мы не были подписаны, то и усложнять ничего не будем.

// если в имени канала какой-то "неформат", его надо заключить в кавычки
// сам символ двойных кавычек - и так не допускается в имени канала
const quot = str => /^[_a-z][0-9a-z_\$]*$/.test(str) ? str : `"${str}"`;

p.subscribe = function(channel) {
  this.connection.query(`LISTEN ${quot(channel)}`);
  return this;
};

p.unsubscribe = function(channel) {
  this.connection.query(`UNLISTEN ${quot(channel)}`);
  return this;
};

Передача и прием сообщения
const PAYLOAD_LIMIT  = 8000 - 1;
const PAYLOAD_FL_STR = '!';
const PAYLOAD_FL_OBJ = '@';
const PAYLOAD_FL_SEQ = '#';
const PAYLOAD_FL_FIN = '$';
const PAYLOAD_SZ_HEAD = 1;
const PAYLOAD_SZ_DATA = PAYLOAD_LIMIT - PAYLOAD_SZ_HEAD;

// только "простые" символы
const reASCII = /^[\x20-\x7E]*$/;

// отправка
p.publish = function(channel, payload) {
  let query = `NOTIFY ${quot(channel)}`;
  if (payload !== null && payload !== undefined) {
    // кодируем тип сообщения - строка или объект
    let str = typeof payload == 'string'
      ? PAYLOAD_FL_STR + payload
      : PAYLOAD_FL_OBJ + JSON.stringify(payload);
    if (str.length > PAYLOAD_LIMIT || !reASCII.test(str)) {
      // отправляем сегменты base64-представления
      const b64 = Buffer.from(str).toString('base64');
      for (let pos = 0, len = b64.length; pos < len; pos += PAYLOAD_SZ_DATA) {
        let fin = pos + PAYLOAD_SZ_DATA;
        let seg = fin >= len
          ? PAYLOAD_FL_FIN + b64.slice(pos)
          : PAYLOAD_FL_SEQ + b64.slice(pos, fin);
        this.connection.query(`${query}, '${seg}'`);
      }
    }
    else {
      // все уместилось в один сегмент с допустимыми символами?
      // не забываем экранировать апостроф
      str = str.replace(/'/g, "''");
      this.connection.query(`${query}, '${str}'`);
    }
  }
  else {
    // простой сигнал в канал без сообщения
    this.connection.query(query);
  }
  return this;
};

// прием и сборка
p._onmessage = function(msg) {
  const {processId, channel, payload} = msg;

  // пропускаем "свои"
  if (processId == this.connection.processID && this.skipSelf) {
    return;
  }

  // "координаты" источника
  const id = `${processId}:${channel}`;

  let rv;
  // тип сегмента
  let fl = payload.charAt(0);

  if (fl == PAYLOAD_FL_SEQ || fl == PAYLOAD_FL_FIN) {
    // base64
    const str = payload.slice(PAYLOAD_SZ_HEAD);
    const slices = this.slices;

    let b64;
    if (fl == PAYLOAD_FL_FIN) {
      // собираем контейнер
      if (slices[id]) {
        slices[id].push(str);
        b64 = slices[id].join('');
        delete slices[id];
      }
      else {
        b64 = str;
      }
    }
    else {
      // дописываем сегмент в массив
      if (slices[id]) {
        slices[id].push(str);
      }
      else {
        slices[id] = [str];
      }
    }

    if (b64) {
      rv = Buffer.from(b64, 'base64').toString();
      fl = rv.charAt(0);
    }
  }
  else {
    // простая строка/объект
    rv = payload;
  }

  if (rv !== undefined) { // может быть ''
    let res = {
      processId
    , channel
    };
    if (rv) {
      // распаковка сообщения в соответствии с типом
      let data = rv.slice(1);
      res.payload = fl == PAYLOAD_FL_OBJ ? JSON.parse(data) : data;
    }

    this.emit(channel, res);
  }
};

Немного тестов
const pg = require('pg');

const pgsql = new pg.Client({
  host : 'example-db'
, port : 5432
, user : 'postgres'
, password : 'postgres'
, database : '_tmp'
});

pgsql.connect(err => {
  let psA = new PubSub(pgsql, 1000);
  let psB = new PubSub(pgsql, 1000);

  let chA = 'channel:A';
  let chB = 'channel:B';
  psA.subscribe(chA);
  psB.subscribe(chB);

  psA.on(chA, (msg) => {
    console.log('A:rcv', msg);
  });
  psB.on(chB, (msg) => {
    console.log('B:rcv', msg);
  });

  psB.publish(chA);
  psB.publish(chA, 'simple string');
  psB.publish(chA, 'мама мыла раму');
  psB.publish(chA, {a : 1});
  psA.publish(chB, 'мама мыла раму 100 раз '.repeat(100));
});

Все достаточно просто, поэтому можно легко реализовать на любом другом ЯП, используемом в вашем проекте, взяв за основу примеры работы с асинхронными уведомлениями:

Тензор
Разработчик системы СБИС

Комментарии 25

    +6
    Если думаете про использование listen/notify — обязательно прочтите это: github.com/postgres/postgres/blob/REL_11_STABLE/src/backend/commands/async.c#L16
    И подумайте, подходят ли эти ограничения для вашей задачи.

    Ну и это очевидно не будет работать через transaction pool mode pgbouncer'а.
      +3
      Если ваши микросервисы уже используют общую базу PostgreSQL

      Бегите, глупцы.


      А так каждая подписка жрет коннект к postgres, а они там не очень дешевые, разве нет?

        0
        Бегите, глупцы.
        Ну ведь не все же, а только некоторые. :)
        каждая подписка жрет коннект к postgres
        Нет, все подписки на разные каналы спокойно живут на одном коннекте со стороны клиента.
        +1
        NOTIFY канал [, сообщение ]
        а почему не pg_notify?
          +1
          А зачем использовать SELECT из функции, который явно больших усилий требует со стороны PG, если можно без этого? Вот изнутри сложного запроса или хранимки звать — так тут просто другого варианта нет.
            0

            :) Это одно и то же)))
            Просто вызывая pg_notify можно использовать переменные в хранимках и прочие plsql штуки для динамического формирования имён каналов или сообщений, а в NOTIFY — только литералы.

            +1
            вот мнение почему так делать не стоит dvps.blog/postgresql-missusage
              0
              Всегда надо стараться соблюсти баланс — с одной стороны не городить себе же SPoF, с другой — не умножать сущностей без надобности.

              Был у нас один кейс, когда отказ от «внешнего» (относительно бизнес-логик) RabbitMQ-кластера в пользу такой «наколеночной» реализации через PG, с которым «все равно уже работали» свел задержки доставки событий практически до нуля — с секунд.
              +5

              Спасибо, добавил себе в коллекцию анти-паттернов.

                0
                Если ваши микросервисы уже используют общую базу PostgreSQL для хранения данных

                выносите, дробите, делите базу на разные…
                  0
                  Вы предлагаете под каждый сервис выделять, в пределе, изолированную реплику базы? То есть если один сервис работает с таблицами [A, B, C, X], а другой с [A, B, C, Y] — все, нельзя жить вместе?
                  Как бы чуть-чуть слишком затратно.
                    0
                    да все так: 1 сервис — 1 база, у которой может быть несколько таблиц — но только из одной домменой модели.
                    Ключевое что доступ к определенной базе, у которой может быть несколько таблиц — только у 1го сервиса
                    Ничего трудозатратного — если уметь готовить.
                      0
                      А куда делись остальные таблицы, с которыми, сервис работает? Или вы предлагаете все свести к межсервисным взаимодействиям с джойнами в прикладном коде?..
                        0
                        если это сущности этого же сервиса — то конечно несколько таблиц. Но если ему нужна информацию по user'у будь добр сделать запрос в другой сервис — ответственный за это.
                        Ключевое что доступ к определенной базе, у которой может быть несколько таблиц — только у 1го сервиса
                          +1
                          Как и любое «абсолютное» решение, его производительность сильно проигрывает тому же JOIN с реплицированной с базы мастер-сервиса таблицей.
                            0
                            это выигрывает сейчас, но потом нет.
                            Если шарится база — прощай независимый деплой, дублирование моделей и т.д. — шанс сломать что то…
                            Микросервисы — это для горизонтального масштабирования, что будет когда у нас будет 2000+ инстансов? Мы упремся в базу!
                              0
                              Я не говорю что надо делать разные сервисы для товара в ИМ, для его свойств и фото.
                              Это все один сервис items — но сервисы orders и basket не должны шарить базу с ним — только межсервисные взаимодействия
                                0
                                Такое «не должны» не является абсолютной истиной. Вы просто размениваете иллюзию независимости сервисов на сложность их разработки (custom join) и поддержки.

                                Разнесли вы items, orders и basket на разные сервисы — а потом приходит менеджмент с необходимостью регулярно делать сводный отчет по товарам, отложенным в корзину, но невыкупленным — и начинаются костыли, потому что нормально работать он не сможет при проблемах на _любом_ из этих сервисов.

                                То есть на одном конце линейки у нас хардкорный монолит, на другом — «россыпь» сервисов 1-в-1. Любое из этих решений в чем-то плохо, в чем-то хорошо — и надо уметь соблюсти баланс, а не строго следовать какой-то догме.
                                  0
                                  И я про это — для приложение в 1000 человек в день — микросервисы не нужны, монолита хватит. Но при большой нагрузке не получится горизонтально масштабироваться — а ведь это ключевое зачем используют микросервисы.
                                  Мы делали сервис, где только одного сервиса было поднято 100+ инстансов

                                  Но городить огород: «микросервисы с общими базами» — ответ простой — или монолит или разносить — микросервисы тут не нужны.
                                    0
                                    Если у вас 1% времени уходит на работу с базой и 99% на прикладной код сервиса, что, кроме идеологических соображений, должно помешать посадить на эту базу 10 инстансов или 10 сервисов?
                                      0
                                      Я не понимаю что вы хотите доказать)
                                      Рано или поздно мы упремся в базу, вариант один горизонтальное масштабирование.
                                      Только вопрос в том когда упремся:
                                      Скоро? Разносить
                                      Никогда? Тогда вообще забыть о микросервисах
                                        +1
                                        «Преждевременная оптимизация — корень всех (или большинства) проблем в программировании.» © Дональд Кнут

                                        Если мы начинаем упираться в базу — ее стоит как-то масштабировать. Если в сервисы — масштабировать их.

                                        Когда это потребуется и как это потребуется. Но подходить аксиоматически «таблицей может пользоваться только один сервис» — плохой подход с точки зрения бизнеса.
                  0

                  Если что, для ноды уже давно есть https://github.com/andywer/pg-listen

                    0
                    Там ведь нет нарезки/сборки payload > 7999 байт на несколько сегментов, или я плохо посмотрел?
                      0

                      ой, да, нарезки и сборки длинных сообщений там нет.
                      Но кмк, это не так уж часто и нужно. А в остальном — уже готовое и работающее решение.

                  Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                  Самое читаемое