При реализации некоторых прикладных задач в рамках экосистемы СБИС случается сталкиваться с неочевидными возможностями PostgreSQL, которые позволяют вместо сложной логики создать решение "в один ход".
Сегодня на примере вполне реальной задачи рассмотрим такие возможности оператора INSERT ... ON CONFLICT
.
Задача: мониторинг
Пусть у нас есть несколько датчиков, каждый из которых периодически заносит в базу текущую температуру: id
датчика, момент измерения ts
и значение temp_val
.
Датчики стоят далеко, каналы слабые, помех много, поэтому от момента измерения до момента попадания этих данных в БД может возникать задержка, но не больше часа.
А еще у нас есть интерфейс, который любит рисовать графики по этим данным. Но ему с каждого датчика достаточно одного отсчета в час: общего количества измерений qty
и средней температуры.
Структура данных
Долго ли, коротко ли, решили разработчики, что агрегаты считать "на лету" - не особо производительно, ведь большинство из них со временем не будут меняться никак, поэтому лучше их сразу сохранять, периодически пересчитывая лишь несколько последних.
Для этих целей будет достаточно одной общей таблицы:
CREATE TABLE metrics(
id
integer
, ts
timestamp
, temp_val
real
, qty
integer
);
Договорились, что если в записи qty IS NULL
, то это первичный "факт" от датчика, а qty IS NOT NULL
- это уже сводный агрегат по нему за конкретный час.
Worker-агрегатор
Чтобы получать свежие агрегаты в базе, запустим процесс, который периодически будет вычитывать все первичные данные от начала предыдущего часа и обновлять в базе:
INSERT INTO metrics(id, ts, temp_val, qty)
SELECT
id
, date_trunc('hour', ts) ts
, avg(temp_val) temp_val
, count(*) qty
FROM
metrics
WHERE
qty IS NULL AND -- только "первичка"
ts >= date_trunc('hour', now() - '1 hour'::interval)
GROUP BY
1, 2;
Подробнее - в статье "Агрегаты в БД — зачем, как, а стоит ли?".
Понятно, что для такого запроса нам и индекс потребуется соответствующий:
CREATE INDEX ON metrics(ts)
WHERE qty IS NULL; -- только для фактов
Решаем конфликты
Но ведь когда мы запускали процедуру агрегации в прошлый раз, весь предыдущий час уже мог быть посчитан, и записи задублируются, а мы этого не хотим! Поэтому добавим ограничение, которое позволит нам гарантированно иметь не более одной агрегированной записи по датчику в час:
CREATE UNIQUE INDEX ON metrics(id, ts)
WHERE qty IS NOT NULL; -- только для агрегатов
Но теперь наш INSERT
просто валится с исключением... Непорядок! Нам то ведь надо, чтобы данные обновлялись:
INSERT INTO metrics(id, ts, temp_val, qty)
SELECT
id
, date_trunc('hour', ts)
, avg(temp_val)
, count(*)
FROM
metrics
WHERE
qty IS NULL AND
ts >= date_trunc('hour', now() - '1 hour'::interval)
GROUP BY
1, 2
ON CONFLICT -- перехватываем конфликт
(id, ts) WHERE qty IS NOT NULL
DO UPDATE SET -- обновляем запись в новое состояние
(qty, temp_val) = (EXCLUDED.qty, EXCLUDED.temp_val);
Боремся с "мертвецами"
Но при таком подходе каждая запись прошлого часа многократно UPDATE
'ится, что приводит к "раздуванию" таблицы! Это можно легко увидеть, добавив к нашему запросу RETURNING *
.
Подробнее - в статье "PostgreSQL Antipatterns: сражаемся с ордами «мертвецов»".
Поэтому давайте обновлять только те записи, которые реально изменились:
INSERT INTO metrics AS m(id, ts, temp_val, qty)
SELECT
id
, date_trunc('hour', ts)
, avg(temp_val)
, count(*)
FROM
metrics
WHERE -- #1 : условие отбора "первички"
qty IS NULL AND
ts >= date_trunc('hour', now() - '1 hour'::interval)
GROUP BY
1, 2
ON CONFLICT
(id, ts) WHERE qty IS NOT NULL -- #2 : условие UNIQUE-индекса "агрегатов"
DO UPDATE SET
(temp_val, qty) = (EXCLUDED.temp_val, EXCLUDED.qty)
WHERE -- #3 : условие обновления записи
(m.temp_val, m.qty) IS DISTINCT FROM (EXCLUDED.temp_val, EXCLUDED.qty);
Обратите внимание, что нам пришлось дать нашей таблице алиас AS m
, чтобы в дальнейшем обратиться к ней в условии по короткому имени.
Собственно, вот и все - наша задача решена "в один запрос" - правда, в нем получилось целых три WHERE
.