PostgreSQL: PipelineDB — агрегирующие запросы в режиме реального времени

  • Tutorial
Вас когда-либо просили посчитать количество чего-то на основании данных в бд за последний месяц, сгруппировав результат по каким-то значениям и разбив всё это ещё по дням/часам?
Если да — то вы уже представляете, что вам придётся написать что-то вроде такого, только хуже

SELECT hour(datetime), somename, count(*), sum(somemetric)
from table
where datetime > :monthAgo
group by 1, 2
order by 1 desc, 2

Время от времени самые разнообразные подобные запросы начинают появляться, и если один раз стерпишь и поможешь — увы, обращения будут поступать и в будущем.

А плохи такие запросы тем, что хорошо отнимают ресурсы системы на время выполнения, да и данных может быть так много, что даже реплику для таких запросов будет жаль (и своего времени).

А что если я скажу, что прямо в PostgreSQL можно создать вьюху, которая на лету будет учитывать только новые поступающие данные в прямо подобном запросе, как выше?

Так вот — это умеет делать расширение PipelineDB

Демо с их сайта, как это работает


Ранее PipelineDB был отдельным проектом, но теперь доступен как расширение для PG 10.1 и выше.

И хотя предоставляемые возможности уже давно существуют в иных продуктах, специально созданных для сбора метрик в режиме реального времени — у PipelineDB есть существенный плюс: меньший порог вхождения для разработчиков, которые уже умеют SQL).

Возможно для кого-то это несущественно. Лично я не ленюсь пробовать всё, что кажется подходящим для решения той или иной задачи, но и не двину тут же юзать одно новое решение на все случаи. Потому в этой заметке я не призываю тут же бросать всё и устанавливать PipelineDB, это лишь обзор основного функционала, т.к. штука мне показалась любопытной.

И так, в целом документация у них хорошая, но я хочу поделиться опытом, как это дело попробовать на практике и вывести результаты в Grafana.

Чтобы не захламлять локальную машину всё разворачиваю в докере.
Используемые образы: postgres:latest, grafana/grafana

Установка PipelineDB на Postgres


На машине с postgres выполнить последовательно:

  1. apt update
  2. apt install curl
  3. curl -s http://download.pipelinedb.com/apt.sh | bash
  4. apt install pipelinedb-postgresql-11
  5. cd /var/lib/postgresql/data
  6. Открыть в любом редакторе файл postgresql.conf
  7. Найти ключ shared_preload_libraries, расскоментить и установить значение pipelinedb
  8. Ключ max_worker_processes установить в значение 128 (рекомендация доки)
  9. Ребутнуть сервер

Создание потока и вьюх в PipelineDB


После ребута пг - наблюдай логи, чтобы там было такое


  1. Бд в которой будем работать: CREATE DATABASE testpipe;
  2. Создание расширения: CREATE EXTENSION pipelinedb;
  3. Теперь самое интересное — создание стрима. Именно в него необходимо добавлять данные для дальнейшей обработки:

    CREATE FOREIGN TABLE flow_stream (
        dtmsk timestamp without time zone,
        action text,
        duration smallint
    ) SERVER pipelinedb;

    По сути очень похоже на создание обыкновенной таблицы, только получить данные из этого стрима простым select нельзя — нужна вьюха
  4. собственно как её создать:

    CREATE VIEW viewflow WITH (ttl = '3 month', ttl_column = 'm') AS
    select minute(dtmsk) m,
           action, 
           count(*), 
           avg(duration)::smallint,
           min(duration),
           max(duration)
    from flow_stream
    group by 1, 2;

    Называются они Continuous Views и по дефолту materialize, т.е. с сохранением состояния.

    В выражении WITH передаются дополнительные параметры.

    В моём случае ttl = '3 month' говорит о том, что хранить нужно данные только за последние 3 месяца, а брать дату/время из колонки M. Фоновый процесс reaper ищет устаревшие данные и удаляет их.

    Для тех, кто не в курсе — функция minute возвращает дату/время без секунд. Таким образом все события, произошедшие в одну минуту будут иметь одно и то же время в результате агрегации.
  5. Такая вьюха — практически таблица, потому индекс по дате для выборки будет полезным, если данных будет храниться много

    create index on viewflow (m desc, action);

Использование PipelineDB


Помни: вставлять данные в стрим, а читать — из подписавшихся на него вьюх

insert into flow_stream VALUES (now(), 'act1', 21);
insert into flow_stream VALUES (now(), 'act2', 33);
select * from viewflow order by m desc, action limit 4;
select now()

Выполняю запрос вручную

Сначала наблюдаю, как меняются данные в 46-ю минуту
Как только наступает 47-я — предыдущая прекращает обновляться и начинает тикать текущая минута.

Если обратить внимание на план запроса, то можно увидеть оригинальную таблицу с данными



Рекомендую сходить в неё и узнать, как на самом деле хранятся ваши данные

Генератор событий на C#
using Npgsql;
using System;
using System.Threading;

namespace PipelineDbLogGenerator
{
    class Program
    {
        private static Random _rnd = new Random();
        private static string[] _actions = new string[] { "foo", "bar", "yep", "goal", "ano" };

        static void Main(string[] args)
        {
            var connString = "Host=localhost;port=5432;Username=postgres;Database=testpipe";

            using (var conn = new NpgsqlConnection(connString))
            {
                conn.Open();

                while (true)
                {
                    var dt = DateTime.UtcNow;

                    using (var cmd = new NpgsqlCommand())
                    {
                        var act = GetAction();

                        cmd.Connection = conn;
                        cmd.CommandText = "INSERT INTO flow_stream VALUES (@dtmsk, @action, @duration)";
                        cmd.Parameters.AddWithValue("dtmsk", dt);
                        cmd.Parameters.AddWithValue("action", act);
                        cmd.Parameters.AddWithValue("duration", GetDuration(act));

                        var res = cmd.ExecuteNonQuery();
                        Console.WriteLine($"{res} {dt}");
                    }

                    Thread.Sleep(_rnd.Next(50, 230));
                }
            }
        }

        private static int GetDuration(string act)
        {
            var c = 0;

            for (int i = 0; i < act.Length; i++)
            {
                c += act[i];
            }

            return _rnd.Next(c);
        }

        private static string GetAction()
        {
            return _actions[_rnd.Next(_actions.Length)];
        }
    }
}


Вывод в Grafana


Для получения данных из postgres нужно добавить соответствующий источник данных:



Создать новую дашборду и добавить на неё панель типа Graph, а после нужно перейти в редактирование панели:



Далее — выбрать источник данных, переключиться в режим написания sql-запроса и ввести такое:

select 
  m as time, -- Grafana требует колонку time
  count, action
from viewflow
where $__timeFilter(m) -- макрос графаны, принимает на вход имя колонки, на выходе col between :startdate and :enddate
order by m desc, action;

И тут же получается нормальный график, конечно, если вы запустили генератор событий



FYI: наличие индекса может оказаться очень важным. Хотя его использование зависит от объёма получившейся таблицы. Если вы планируете хранить небольшое количество строк за небольшое количество времени, то очень легко может оказаться, что seq scan будет дешевле, а индекс лишь добавит доп. нагрузку при обновлении значений

На один стрим может быть подписано несколько вьюх.

Допустим я хочу видеть сколько выполняются методы апи в разрезе по процентилям

CREATE VIEW viewflow_per WITH (ttl = '3 d', ttl_column = 'm') AS
select minute(dtmsk) m,
      action, 
      percentile_cont(0.50) WITHIN GROUP (ORDER BY duration)::smallint p50,
      percentile_cont(0.95) WITHIN GROUP (ORDER BY duration)::smallint p95,
      percentile_cont(0.99) WITHIN GROUP (ORDER BY duration)::smallint p99
from flow_stream
group by 1, 2;

create index on viewflow_per (m desc);

Проделываю тот же трюк с графаной и получаю:


Итого


В целом штука рабочая, вела себя хорошо, без нареканий. Хотя под докером загрузка их демо базы в архиве (2.3 гб) оказалась малость долгим делом.

Хочу заметить — я не проводил нагрузочные тесты.

Официальная документация

Может быть интересным


Поделиться публикацией

Похожие публикации

Комментарии 24

    0

    Спасибо за статью, буквально в пятницу узнал об этом расширении для pg. Подскажите, а для 9.6 postgresql версии как быть? Расширения нет под 9.6?

      0
      Кажется расширения под 9.6 нет, но я не сильно искал
      PipelineDB currently supports PostgreSQL versions 10.1, 10.2, 10.3, 10.4, 10.5, and 11.0
      Как быть — заюзать уже пг 10 )
      Даже 11-й зарелизился
      Ну или если решили, что нужно использовать PipelineDB, а хотите продолжать сидеть на 9.6, то всегда можно поднять ещё один свежий Postgres и там развлекаться
      0
      del
        0
        Интересно: а это расширение может помочь, если данные будут в основном в JSONB поле?
          0
          Типа того?
          select minute(eventtime) as time, 
                 sum((data->>'value')::integer) as sum
          from table1
          

          Не вижу проблем с таким вариантом, хотя не пробовал

          Тут же суть в том, что расширение организует поточную обработку данных
          т.е.: приходят данные, возможно накапливаются в каком-то буфере, в фоне выполняется запрос из вьюхи над новыми данными, далее — результат вставляется в результирующую таблицу, либо — дополяет старые данные

          И чисто теоретически не сильно важно, какого типа данные гоняются от потока до вьюхи, потому что вы сами же с ними и работаете весьма прозрачным образом

          Например если у вас в jsonb хранится массив объектов и хотите именно его обрабатывать «в потоке», то можно сделать трансформацию разложив массив на строки и… работать как с обычной таблицей
            0
            А можно JSONB разложить в представление (view)?
              0

              Определённо да, вечером скину пример

                0

                Один JSON-объект можно разложить в запись с помощью jsonb_to_record, а массив — с помощью jsonb_to_recordset. См. официальную документацию и мой пример.

                  0
                  Как подсказали выше, вот так

                  select * 
                  from jsonb_to_recordset(
                          '[{"a":1,"b":"foo"},{"a":"2","c":"bar", "v":"2018-04-01"}]'::jsonb
                  ) as x(a int, b text, v date, not_exists time);

                    0
                    Это простенький JSON, а если сложный иерархический, но структура более менее постоянна.
                      +1

                      Можно вытаскивать нужный кусок JSONB и скармливать его это же функции, на ходу подправляя, если требуется.
                      Вот, например, если у нас есть таблица listings, у которой есть jsonb-поле raw, где-то в недрах которого лежит нужный нам массив (а иногда не массив):


                      SELECT id, v."SKU"
                      FROM 
                        listings, 
                        jsonb_to_recordset(
                          CASE jsonb_typeof(raw->'Variations'->'Variation')
                          WHEN 'array' THEN raw->'Variations'->'Variation'
                          ELSE jsonb_build_array(raw->'Variations'->'Variation')
                          END
                        ) AS v("SKU" varchar)
                      WHERE raw IS NOT NULL 
                        AND raw->'Variations'->'Variation' IS NOT NULL
                  0
                  Т.е. эта система потокобезопасная и лишена дедлоков при наличии вставок через несколько подключений к БД?
                    0
                    Не знаю
                    Уверен, что при простом использовании стримов и вьюх — всё будет ок
                    Но если использовать вызовы различных кастомных методов, то внутри можно написать любуую логику и да — монжо словить классический дедлок, всё как всегда
                    У них в репе issues имеют метку deadlock, но они скорее не к экстеншену для postgres
                0
                А где в статье связь между запросом
                SELECT hour(datetime), somename, count(*), sum(somemetric)
                from table
                where datetime > :monthAgo
                group by 1, 2
                order by 1 desc, 2

                и стримом
                
                CREATE FOREIGN TABLE flow_stream (
                    dtmsk timestamp without time zone,
                    action text,
                    duration smallint
                ) SERVER pipelinedb;

                Я так понимаю, в «table» нужно триггер или правило добавлять, чтобы заливать данные в «flow_stream»?
                  0
                  PipelineDB можно рассматривать как таблицы с интегрированными триггерами.
                    0
                    Мне очень понравился первый абзац:
                    Вас когда-либо просили посчитать количество чего-то на основании данных в бд за последний месяц, сгруппировав результат по каким-то...

                    Что если у меня уже есть Postgres DB со своими таблицами, к которым требуется писать множество агрегатных запросов. Я так понимаю, чтобы использовать PipelineDB, я должен переместить таблицы в PipelineDB или написать некие правила для отправки данных из своих таблиц в PipelineDB? — этих правил в статье я не увидел.
                      +1
                      Достаточно детальный пример с примечаниями: Streaming Databases & PipelineDB: Likes Use Case

                      В статье много чего не описано по сравнению с официальной документацией, в том числе и про докер, т.к. готовый образ уже собран.
                        0
                        Почитал документацию, для себя определил PipelineDB, как механизм агрегирующих счетчиков в режиме реального времени. Счетчик от потока отключил, агрегированные данные потерял.
                        Идея интересная, но не для случая уже хранимых данных. Удобно для данных, которые только планируется сохранять, с учетом, что счетчики должны быть включены заранее.
                  0
                  Вечный tradeoff — тормозить чтение, или тормозить запись. Аггрегация, как я понимаю, синхронная, классическая?
                    0
                    curl -s http://download.pipelinedb.com/apt.sh | bash

                    Очень сильно удивился увидев такую конструкцию. Открыл документацию pipelinedb и увидел там ровно такую же строчку. Выглядит очень грустно.
                      0
                      Угу. В нем безапеляционно мелькают строки «apt-get install -y ...» без всяких вопросов. За такое обычно по рукам бьют.
                        0
                        Это проблемы разного уровня, если я ничего не путаю.
                        apt-get install -y в худшем случае сломает вам систему, если у вас нет косячных репозиториев.
                        curl -s http | bash — в худшем случае установит вирус, который спросит права на рута и вы ему их сами предоставите
                          0
                          Неважно какого уровня: это — проблема. И считаю, что да, за такое надо бить по рукам.
                      +1
                      Поздно postgresql, поздно. OLAP+SQL=ClickHouse.
                      У нас у самих до сир пор осталось пяток самописных агрегаций в постгресе. Характеристика одним словом — неудобно.

                      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                      Самое читаемое