Как стать автором
Обновить

Секционирование и «живые снимки» данных в PostgreSQL

Время на прочтение19 мин
Количество просмотров9.3K
Хотя тема секционирования уже поднималась ранее, я хочу к ней вернуться, чтобы рассказать о своем опыте решения этой задачи, возникшей в связи с необходимостью аналитической обработкой больших объемов данных. Помимо секционирования, я рассмотрю предельно упрощенную реализацию «снимков» агрегированных запросов, автоматически обновляемых при изменении исходных данных..

Одним из главных требований, к разрабатываемой системе, было использование бесплатного ПО, в связи с чем, выбор пал на PostgreSQL. На момент начала работы над проектом, я довольно поверхностно знал PostgreSQL, но был неплохо знаком с возможностями Oracle Database. Поскольку речь шла об аналитической обработке, мне хотелось иметь аналоги таких опций Oracle как Partitioning и Materialized Views. После ознакомления с возможностями PostgreSQL, стало понятно, что этот функционал, так или иначе, придется писать вручную.

Разумеется, речь не шла о какой либо полноценной реализации Materialized Views, предусматривающей переписывание запросов. Для моих нужд вполне хватало возможности создания автоматически обновляемых агрегированных одно-табличных выборок (поддержка соединения таблиц, скорее всего, будет добавлена в ближайшем будущем). Для секционирования, я планировал использовать многократно описанный подход с использованием наследуемых таблиц, со вставкой данных, управляемой триггером. У меня была мысль использовать для управления секционированием Rules, но я от нее отказался, поскольку, в моем случае, преобладала вставка данных одиночными записями.

Начал я, разумеется, с таблиц для хранения метаданных:

ps_tables.sql
create sequence ps_table_seq;

create table    ps_table (
  id            bigint         default nextval('ps_table_seq') not null,
  name          varchar(50)    not null unique,
  primary key(id)
);

create sequence ps_column_seq;

create table    ps_column (
  id            bigint         default nextval('ps_column_seq') not null,
  table_id      bigint         not null references ps_table(id),
  name          varchar(50)    not null,
  parent_name   varchar(50),
  type_name     varchar(8)     not null check (type_name in ('date', 'key', 'nullable', 'sum', 'min', 'max', 'cnt')),
  unique (table_id, name),
  primary key(id)
);

create table    ps_range_partition (
  table_id      bigint         not null references ps_table(id),
  type_name     varchar(10)    not null check (type_name in ('day', 'week', 'month', 'year')),
  start_value   date           not null,
  end_value     date           not null,
  primary key(table_id, start_value)
);

create table    ps_snapshot (
  snapshot_id   bigint         not null references ps_table(id),
  table_id      bigint         not null references ps_table(id),
  type_name     varchar(10)    not null check (type_name in ('day', 'week', 'month', 'year')),
  primary key(snapshot_id)
);


Здесь все достаточно очевидно. Единственное, о чем стоит сказать, это типы столбцов:
Тип
Описание
date
Столбец, содержащий календарную дату, используемый при секционировании и агрегации данных (поддерживаются типы date и timestamp PostgreSQL)
key
Ключ, используемый в фразе group by, при агрегации данных (поддерживаются все целочисленные типы PostgreSQL)
nullable
Ключ, используемый при агрегации данных, возможно содержащий значение null
sum
Суммирование значений
min
Минимальное значение
max
Максимальное значение
cnt
Подсчет количества не null-значений

Основой всего решения стала функция, выполняющая перестроение функций триггеров для таблицы, содержащей исходные данные:

ps_trigger_regenerate(bigint)
create or replace function ps_trigger_regenerate(in p_table bigint) returns void
as $$
declare
  l_sql         text;
  l_table_name  varchar(50);
  l_date_column varchar(50);
  l_flag        boolean;
  tabs          record;
  columns       record;
begin
  select name into l_table_name
  from   ps_table where id = p_table;

  l_sql := 
 'create or replace function ps_' || l_table_name || '_insert_trigger() returns trigger ' ||
 'as $'|| '$ ' ||
 'begin ';
  for tabs in
    select a.snapshot_id as id,
           b.name as table_name,
           a.type_name as snapshot_type
    from   ps_snapshot a, ps_table b
    where  a.table_id = p_table
    and    b.id = a.snapshot_id
    loop
      l_flag = FALSE;
      l_sql := l_sql ||
     'update ' || tabs.table_name || ' set ';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    not type_name in ('date', 'key', 'nullable')
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'sum' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' + coalesce(NEW.' || columns.parent_name || ', 0) ';
          end if;
          if columns.type_name = 'min' then
             l_sql := l_sql ||
             columns.name || ' = least(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
          end if;
          if columns.type_name = 'max' then
             l_sql := l_sql ||
             columns.name || ' = greatest(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
          end if;
          if columns.type_name = 'cnt' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';
          end if;
        end loop;
      l_flag = FALSE;
      l_sql := l_sql || 'where ';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('date', 'key', 'nullable')
        loop
          if l_flag then
             l_sql := l_sql || 'and ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'date' then
             l_sql := l_sql ||
             columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
          end if;
          if columns.type_name = 'key' then
             l_sql := l_sql ||
             columns.name || ' = NEW.' || columns.parent_name || ' ';
          end if;
          if columns.type_name = 'nullable' then
             l_sql := l_sql ||
             columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
          end if;
        end loop;
      l_sql := l_sql || '; ' ||
     'if not FOUND then ' ||
     'insert into ' || tabs.table_name || '(';
      l_flag = FALSE;
      for columns in
        select name, type_name
        from   ps_column
        where  table_id = tabs.id
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          l_sql := l_sql || columns.name;
        end loop;
      l_sql := l_sql || ') values (';
      l_flag = FALSE;
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'date' then
             l_sql := l_sql || 'date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ')';
          elsif columns.type_name = 'cnt' then
             l_sql := l_sql || 'case when NEW.' || columns.parent_name || ' is null then 0 else 1 end';
          elsif columns.type_name in ('nullable', 'sum') then
             l_sql := l_sql || 'coalesce(NEW.' || columns.parent_name || ', 0)';
          else
             l_sql := l_sql || 'NEW.' || columns.parent_name;
          end if;
        end loop;
      l_sql := l_sql || '); ' ||
     'end if; ';
    end loop;
    select name into l_date_column
    from   ps_column
    where  table_id = p_table
    and    type_name = 'date';
    for tabs in
      select to_char(start_value, 'YYYYMMDD') as start_value,
             to_char(end_value, 'YYYYMMDD') as end_value,
             type_name
      from   ps_range_partition
      where  table_id = p_table
      order  by start_value desc
      loop
        l_sql := l_sql ||
       'if NEW.' || l_date_column || ' >= to_date(''' || tabs.start_value || ''', ''YYYYMMDD'') and NEW.' || l_date_column || ' < to_date(''' || tabs.end_value || ''', ''YYYYMMDD'') then ' ||
          'insert into ' || l_table_name || '_' || tabs.start_value || ' values (NEW.*); ' ||
          'return null; ' ||
       'end if; ';
      end loop;
  l_sql := l_sql ||
 'return NEW; '||
 'end; '||
 '$'||'$ language plpgsql';
  execute l_sql;

  l_sql := 
 'create or replace function ps_' || l_table_name || '_raise_trigger() returns trigger ' ||
 'as $'|| '$ ' ||
 'begin ' ||
   'raise EXCEPTION ''Can''''t support % on MIN or MAX aggregate'', TG_OP;' ||
 'end; '||
 '$'||'$ language plpgsql';
  execute l_sql;

  l_sql := 
 'create or replace function ps_' || l_table_name || '_delete_trigger() returns trigger ' ||
 'as $'|| '$ ' ||
 'begin ';
  for tabs in
    select a.snapshot_id as id,
           b.name as table_name,
           a.type_name as snapshot_type
    from   ps_snapshot a, ps_table b
    where  a.table_id = p_table
    and    b.id = a.snapshot_id
    loop
      l_flag = FALSE;
      l_sql := l_sql ||
     'update ' || tabs.table_name || ' set ';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('sum', 'cnt')
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'sum' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' ';
          end if;
          if columns.type_name = 'cnt' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ';
          end if;
        end loop;
      l_flag = FALSE;
      l_sql := l_sql || 'where ';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('date', 'key', 'nullable')
        loop
          if l_flag  then
             l_sql := l_sql || 'and ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'date' then
             l_sql := l_sql ||
             columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
          end if;
          if columns.type_name = 'key' then
             l_sql := l_sql ||
             columns.name || ' = NEW.' || columns.parent_name || ' ';
          end if;
          if columns.type_name = 'nullable' then
             l_sql := l_sql ||
             columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
          end if;
        end loop;
      l_sql := l_sql || '; ';
    end loop;
  l_sql := l_sql ||
 'return null; '||
 'end; '||
 '$'||'$ language plpgsql';
  execute l_sql;

  l_sql := 
 'create or replace function ps_' || l_table_name || '_update_trigger() returns trigger ' ||
 'as $'|| '$ ' ||
 'begin ';
  for tabs in
    select a.snapshot_id as id,
           b.name as table_name,
           a.type_name as snapshot_type
    from   ps_snapshot a, ps_table b
    where  a.table_id = p_table
    and    b.id = a.snapshot_id
    loop
      l_flag = FALSE;
      l_sql := l_sql ||
     'update ' || tabs.table_name || ' set ';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('sum', 'cnt')
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'sum' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' + NEW.' || columns.parent_name || ' ';
          end if;
          if columns.type_name = 'cnt' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name ||
             ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ' ||
             ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';
          end if;
        end loop;
      l_flag = FALSE;
      l_sql := l_sql || 'where ';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('date', 'key', 'nullable')
        loop
          if l_flag then
             l_sql := l_sql || 'and ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'date' then
             l_sql := l_sql ||
             columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
          end if;
          if columns.type_name = 'key' then
             l_sql := l_sql ||
             columns.name || ' = NEW.' || columns.parent_name || ' ';
          end if;
          if columns.type_name = 'nullable' then
             l_sql := l_sql ||
             columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
          end if;
        end loop;
      l_sql := l_sql || '; ';
    end loop;
  l_sql := l_sql ||
 'return null; '||
 'end; '||
 '$'||'$ language plpgsql';
  execute l_sql;
end;
$$ language plpgsql;


Несмотря на свой устрашающий вид, эта функция довольно проста. Ее задача — сформировать (на основе имеющихся метаданных), четыре функции, используемых при построении триггеров:

  • ps_TABLE_insert_trigger() — Функция управляющая вставкой данных
  • ps_TABLE_update_trigger() — Функция управляющая обновлением данных
  • ps_TABLE_delete_trigger() — Функция управляющая удалением данных
  • ps_TABLE_raise_trigger() — Функция запрещающая обновление и удаление данных

Здесь, вместо TABLE подставляется имя таблицы, содержащей исходные данные. Типичное определение функции ps_TABLE_insert_trigger() будет выглядеть следующим образом:

create or replace function ps_data_insert_trigger() returns trigger
as $$
begin
  update data_month set
    sum_field = sum_field + NEW.sum_field
  , min_field = least(min_field, NEW.min_field)
  where date_field = date_trunc('month', NEW.date_field)
  and   key_field = NEW.key_field;
  if not FOUND then
     insert into data_month(date_field, key_field, sum_field, min_field)
     values (date_trunc('month', NEW.date_field), NEW.key_field, NEW.sum_field, NEW.min_field);
  end if;
  if NEW.date_field >= to_date('20130101', 'YYYYMMDD') and 
     NEW.date_field < to_date('20130201', 'YYYYMMDD') then
     insert into data_20130101 values (NEW.*);
     return null;
  end if;
  return NEW;
end;
$$ language plpgsql;

На самом деле, функция выглядит несколько сложнее, поскольку особым образом обрабатываются null-значения. Но, в качестве иллюстрации, приведенный выше пример вполне адекватен. Логика этого кода очевидна:

  • При вставке в исходную таблицу data, пытаемся обновить счетчики в агрегированном представлении data_month
  • Если это не удалось (запись в data_month не найдена), добавляем новую запись
  • Далее, проверяем попадание в интервал дат для каждой секции (в примере одна секция), и при успехе, вставляем запись в соответствующую секцию (поскольку секция наследуется от главной таблицы, можно смело использовать звездочку) и возращаем null, чтобы предотвратить вставку записи в главную таблицу
  • Если ни одна из секций не подходит, возвращаем NEW, позволяя выполнить вставку в главную таблицу


Последний пункт приводит к тому, что если подходящая секция не найдена, данные добавляются в главную таблицу. На практике это довольно удобно. Даже если мы не создадим секцию заранее или получим данные с некорректной датой, вставка данных пройдет успешно. Впоследствии можно проанализировать содержимое главной таблицы, выполнив запрос:

select * from only data

После чего, создать недостающие секции (как будет показано ниже, данные будут автоматически перенесены из главной таблицы в созданную секцию). В подобных случаях, количество записей, не попавших в свою секцию, как правило, не велико и издержки, на перенос данных, незначительны.

Теперь осталось сделать обвязку. Начнем с функции создания новой секции:

ps_add_range_partition(varchar, varchar, varchar, date)
create or replace function ps_add_range_partition(in p_table varchar, in p_column varchar, 
                in p_type varchar, in p_start date) returns void
as $$
declare
  l_sql       text;
  l_end       date;
  l_start_str varchar(10);
  l_end_str   varchar(10);
  l_table     bigint;
  l_flag      boolean;
  columns     record;
begin
  perform 1
  from   ps_table a, ps_column b
  where  a.id = b.table_id and lower(a.name) = lower(p_table)
  and    b.type_name = 'date' and lower(b.name) <> lower(p_column);
  if FOUND then
     raise EXCEPTION 'Conflict DATE columns';
  end if;

  l_end := p_start + ('1 ' || p_type)::INTERVAL;

  perform 1
  from   ps_table a, ps_range_partition b
  where  a.id = b.table_id and lower(a.name) = lower(p_table)
  and (( p_start >= b.start_value and p_start < b.end_value ) or
       ( b.start_value >= p_start and b.start_value < l_end ));
  if FOUND then
     raise EXCEPTION 'Range intervals intersects';
  end if;

  perform 1
  from   ps_table
  where  lower(name) = lower(p_table);
  if not FOUND then
     insert into ps_table(name) values (lower(p_table));
  end if;

  select id into l_table
  from   ps_table
  where  lower(name) = lower(p_table);

  perform 1
  from   ps_column
  where  table_id = l_table and type_name = 'date'
  and    lower(name) = lower(p_column);
  if not FOUND then
     insert into ps_column(table_id, name, type_name)
     values (l_table, lower(p_column), 'date');
  end if;

  insert into ps_range_partition(table_id, type_name, start_value, end_value)
  values (l_table, p_type, p_start, l_end);

  l_start_str = to_char(p_start, 'YYYYMMDD');
  l_end_str = to_char(l_end, 'YYYYMMDD');

  l_sql :=
 'create table ' || p_table || '_' || l_start_str || '(' ||
   'check (' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||
                p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')), ' ||
   'primary key (';

    l_flag := FALSE;
    for columns in
      select f.name as name
      from ( select ps_array_to_set(a.conkey) as nn
             from   pg_constraint a, pg_class b
             where  b.oid = a.conrelid
             and    a.contype = 'p'
             and    b.relname = p_table ) c, 
           ( select d.attname as name, d.attnum as nn
             from   pg_attribute d, pg_class e
             where  e.oid = d.attrelid
             and    e.relname = p_table ) f
      where  f.nn = c.nn
      order  by f.nn
      loop
        if l_flag then
           l_sql := l_sql || ', ';
        end if;
        l_flag := TRUE;
        l_sql := l_sql || columns.name;
      end loop;

  l_sql := l_sql ||
 ')) inherits (' || p_table || ')';
  execute l_sql;

  l_sql := 
 'create index ' || p_table || '_' || l_start_str || '_date on ' || p_table || '_' || l_start_str || '(' || p_column || ')';
  execute l_sql;

  perform ps_trigger_regenerate(l_table);

  execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
  execute 'drop trigger if exists ps_' || p_table || '_after_update on '  || p_table;
  execute 'drop trigger if exists ps_' || p_table || '_after_delete on '  || p_table;

  l_sql := 
 'insert into ' || p_table || '_' || l_start_str || ' ' ||
 'select * from ' || p_table || ' where ' ||
  p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||
  p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';
  execute l_sql;

  l_sql := 
 'delete from only ' || p_table || ' where ' ||
  p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||
  p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';
  execute l_sql;

  l_sql := 
 'create trigger ps_' || p_table || '_before_insert ' ||
 'before insert on ' || p_table || ' for each row ' ||
 'execute procedure ps_' || p_table || '_insert_trigger()';
  execute l_sql;
  perform 1
  from   ps_snapshot a, ps_column b
  where  b.table_id = a.snapshot_id and a.table_id = l_table
  and    b.type_name in ('min', 'max');
  if FOUND then
     l_sql := 
    'create trigger ps_' || p_table || '_after_update ' ||
    'after update on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_after_delete ' ||
    'after delete on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||
    'after update on ' || p_table || '_' || l_start_str || ' for each row ' ||
    'execute procedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||
    'after delete on ' || p_table || '_' || l_start_str || ' for each row ' ||
    'execute procedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
  else
     l_sql := 
    'create trigger ps_' || p_table || '_after_update ' ||
    'after update on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_update_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_after_delete ' ||
    'after delete on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_delete_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||
    'after update on ' || p_table || '_' || l_start_str || ' for each row ' ||
    'execute procedure ps_' || p_table || '_update_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||
    'after delete on ' || p_table || '_' || l_start_str || ' for each row ' ||
    'execute procedure ps_' || p_table || '_delete_trigger()';
     execute l_sql;
  end if;
end;
$$ language plpgsql;


Здесь, после проверки корректности входных данных, мы добавляем необходимые метаданные, после чего, создаем унаследованную таблицу. Затем, мы пересоздаем функции триггеров вызовом ps_trigger_regenerate, после чего переносим данные, подпадающие под условие секционирования в созданную секцию динамическим запросом и пересоздаем сами триггеры.

Сложности возникли с двумя моментами.

  1. Пришлось немного помучиться с прибавлением к стартовой дате месяца, дня или года (в зависимости от входного параметра p_type:

    l_end := p_start + ('1 ' || p_type)::INTERVAL;
    

  2. Поскольку первичный ключ не наследуется, пришлось сочинять запрос к System Catalogs, для получения списка колонок первичного ключа исходной таблицы (хранить в своих метаданных еще и описание первичного ключа я счел нецелесообразным):

          select f.name as name
          from ( select ps_array_to_set(a.conkey) as nn
                 from   pg_constraint a, pg_class b
                 where  b.oid = a.conrelid
                 and    a.contype = 'p'
                 and    b.relname = p_table ) c, 
               ( select d.attname as name, d.attnum as nn
                 from   pg_attribute d, pg_class e
                 where  e.oid = d.attrelid
                 and    e.relname = p_table ) f
          where  f.nn = c.nn
          order  by f.nn
    


Также, следует отметить, что перед созданием индекса, на ключ секционирования (для созданной секции), стоило бы предварительно проверить, не является ли он лидирующим столбцом первичного ключа (чтобы не создавать дублирующий индекс).

Функция удаления секции существенно проще и в особых комментариях не нуждается:

ps_del_range_partition(varchar, date)
create or replace function ps_del_range_partition(in p_table varchar, in p_start date) 
      returns void
as $$
declare
  l_sql       text;
  l_start_str varchar(10);
  l_table     bigint;
begin
  select id into l_table
  from   ps_table
  where  lower(name) = lower(p_table);

  l_start_str = to_char(p_start, 'YYYYMMDD');

  delete from ps_range_partition 
  where  table_id = l_table
  and    start_value = p_start;

  perform ps_trigger_regenerate(l_table);

  l_sql := 
 'insert into ' || p_table || ' ' ||
 'select * from ' || p_table || '_' || l_start_str;
  execute l_sql;

  perform 1
  from ( select 1
         from   ps_range_partition
         where  table_id = l_table
         union  all
         select 1
         from   ps_snapshot
         where  table_id = l_table ) a;
  if not FOUND then
     execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
     execute 'drop trigger if exists ps_' || p_table || '_after_update on '  || p_table;
     execute 'drop trigger if exists ps_' || p_table || '_after_delete on '  || p_table;

     execute 'drop function ps_' || p_table || '_insert_trigger() cascade';
     execute 'drop function ps_' || p_table || '_raise_trigger()  cascade';
     execute 'drop function ps_' || p_table || '_update_trigger() cascade';
     execute 'drop function ps_' || p_table || '_delete_trigger() cascade';

     delete from ps_column where table_id = l_table;
     delete from ps_table where id = l_table;
  end if;

  perform 1
  from   ps_range_partition
  where  table_id = l_table;
  if not FOUND then
     delete from ps_column 
     where  table_id = l_table
     and    type_name = 'date';
  end if;

  execute 'drop table ' || p_table || '_' || l_start_str;
end;
$$ language plpgsql;


При удалении секции, данные, естественно, не теряются, а переносятся в главную таблицу (предварительно удаляются триггеры, поскольку, как выяснилось, ключевое слово only не работает в операторе insert).

Осталось добавить функции управления «живыми» снимками данных:

ps_add_snapshot_column(varchar, varchar, varchar, varchar)
create or replace function ps_add_snapshot_column(in p_snapshot varchar, 
     in p_column varchar, in p_parent varchar, in p_type varchar) returns void
as $$
declare
  l_table bigint;
begin
  perform 1
  from   ps_table
  where  lower(name) = lower(p_snapshot);
  if not FOUND then
     insert into ps_table(name) values (lower(p_snapshot));
  end if;

  select id into l_table
  from   ps_table
  where  lower(name) = lower(p_snapshot);

  insert into ps_column(table_id, name, parent_name, type_name)
  values (l_table, lower(p_column), lower(p_parent), p_type);
end;
$$ language plpgsql;


ps_add_snapshot(varchar, varchar, varchar)
create or replace function ps_add_snapshot(in p_table varchar, in p_snapshot varchar, 
     in p_type varchar) returns void
as $$
declare
  l_sql      text;
  l_table    bigint;
  l_snapshot bigint;
  l_flag     boolean;
  columns    record;
begin
  select id into l_snapshot
  from   ps_table
  where  lower(name) = lower(p_snapshot);

  perform 1
  from   ps_column
  where  table_id = l_snapshot
  and    type_name in ('date', 'key');
  if not FOUND then
     raise EXCEPTION 'Key columns not found';
  end if;

  perform 1
  from   ps_column
  where  table_id = l_snapshot
  and    not type_name in ('date', 'key', 'nullable');
  if not FOUND then
     raise EXCEPTION 'Aggregate columns not found';
  end if;

  perform 1
  from   ps_table
  where  lower(name) = lower(p_table);
  if not FOUND then
     insert into ps_table(name) values (lower(p_table));
  end if;

  select id into l_table
  from   ps_table
  where  lower(name) = lower(p_table);

  insert into ps_snapshot(table_id, snapshot_id, type_name)
  values (l_table, l_snapshot, p_type);

  perform ps_trigger_regenerate(l_table);

  l_sql := 'create table ' || p_snapshot || ' (';
  l_flag := FALSE;
  for columns in
    select name, type_name
    from   ps_column
    where  table_id = l_snapshot
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      if columns.type_name = 'date' then
         l_sql := l_sql || columns.name || ' date not null';
      else
         l_sql := l_sql || columns.name || ' bigint not null';
      end if;
    end loop;
  l_sql := l_sql || ', primary key (';
  l_flag := FALSE;
  for columns in
    select name
    from   ps_column
    where  table_id = l_snapshot
    and    type_name in ('date', 'key', 'nullable')
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      l_sql := l_sql || columns.name;
    end loop;
  l_sql := l_sql || '))';
  execute l_sql;

  execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
  execute 'drop trigger if exists ps_' || p_table || '_after_update on '  || p_table;
  execute 'drop trigger if exists ps_' || p_table || '_after_delete on '  || p_table;

  l_sql := 
 'create trigger ps_' || p_table || '_before_insert ' ||
 'before insert on ' || p_table || ' for each row ' ||
 'execute procedure ps_' || p_table || '_insert_trigger()';
  execute l_sql;

  perform 1
  from   ps_snapshot a, ps_column b
  where  b.table_id = a.snapshot_id and a.table_id = l_table
  and    b.type_name in ('min', 'max');
  if FOUND then
     l_sql := 
    'create trigger ps_' || p_table || '_after_update ' ||
    'after update on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_after_delete ' ||
    'after delete on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
  else
     l_sql := 
    'create trigger ps_' || p_table || '_after_update ' ||
    'after update on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_update_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_after_delete ' ||
    'after delete on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_delete_trigger()';
     execute l_sql;
  end if;

  l_sql := 'insert into ' || p_snapshot || '(';
  l_flag := FALSE;
  for columns in
    select name
    from   ps_column
    where  table_id = l_snapshot
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      l_sql := l_sql || columns.name;
    end loop;
  l_sql := l_sql || ') select ';
  l_flag := FALSE;
  for columns in
    select parent_name as name, type_name
    from   ps_column
    where  table_id = l_snapshot
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      if columns.type_name = 'date' then
         l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';
      end if;
      if columns.type_name = 'key' then
         l_sql := l_sql || columns.name;
      end if;
      if columns.type_name = 'nullable' then
         l_sql := l_sql || 'coalesce(' || columns.name || ', 0)';
      end if;
      if columns.type_name = 'sum' then
         l_sql := l_sql || 'sum(' || columns.name || ')';
      end if;
      if columns.type_name = 'min' then
         l_sql := l_sql || 'min(' || columns.name || ')';
      end if;
      if columns.type_name = 'max' then
         l_sql := l_sql || 'max(' || columns.name || ')';
      end if;
      if columns.type_name = 'cnt' then
         l_sql := l_sql || 'count(' || columns.name || ')';
      end if;
    end loop;
  l_sql := l_sql || 'from ' || p_table || ' group by ';
  l_flag := FALSE;
  for columns in
    select parent_name as name, type_name
    from   ps_column
    where  table_id = l_snapshot
    and    type_name in ('date', 'key', 'nullable')
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      if columns.type_name = 'date' then
         l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';
      else
         l_sql := l_sql || columns.name;
      end if;
    end loop;
  execute l_sql;
end;
$$ language plpgsql;


ps_del_snapshot(varchar)
create or replace function ps_del_snapshot(in p_snapshot varchar) returns void
as $$
declare
  l_sql      text;
  p_table    varchar(50);
  l_table    bigint;
  l_snapshot bigint;
begin
  select a.table_id, c.name into l_table, p_table
  from   ps_snapshot a, ps_table b, ps_table c
  where  b.id = a.snapshot_id and c.id = a.table_id
  and    lower(b.name) = lower(p_snapshot);

  select id into l_snapshot
  from   ps_table
  where  lower(name) = lower(p_snapshot);

  delete from ps_snapshot where snapshot_id = l_snapshot;
  delete from ps_column where table_id = l_snapshot;
  delete from ps_table where id = l_snapshot;

  execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
  execute 'drop trigger if exists ps_' || p_table || '_after_update  on ' || p_table;
  execute 'drop trigger if exists ps_' || p_table || '_after_delete  on ' || p_table;
  
  perform 1
  from ( select 1
         from   ps_range_partition
         where  table_id = l_table
         union  all
         select 1
         from   ps_snapshot
         where  table_id = l_table ) a;
  if not FOUND then
     execute 'drop function if exists ps_' || p_table || '_insert_trigger() cascade';
     execute 'drop function if exists ps_' || p_table || '_raise_trigger()  cascade';
     execute 'drop function if exists ps_' || p_table || '_update_trigger() cascade';
     execute 'drop function if exists ps_' || p_table || '_delete_trigger() cascade';
  else
     perform ps_trigger_regenerate(l_table);

     l_sql := 
    'create trigger ps_' || p_table || '_before_insert ' ||
    'before insert on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_insert_trigger()';
     execute l_sql;

     perform 1
     from   ps_snapshot a, ps_column b
     where  b.table_id = a.snapshot_id and a.table_id = l_table
     and    b.type_name in ('min', 'max');
     if FOUND then
        l_sql := 
       'create trigger ps_' || p_table || '_after_update ' ||
       'after update on ' || p_table || ' for each row ' ||
       'execute procedure ps_' || p_table || '_raise_trigger()';
        execute l_sql;
        l_sql := 
       'create trigger ps_' || p_table || '_after_delete ' ||
       'after delete on ' || p_table || ' for each row ' ||
       'execute procedure ps_' || p_table || '_raise_trigger()';
        execute l_sql;
     else
        l_sql := 
       'create trigger ps_' || p_table || '_after_update ' ||
       'after update on ' || p_table || ' for each row ' ||
       'execute procedure ps_' || p_table || '_update_trigger()';
        execute l_sql;
        l_sql := 
       'create trigger ps_' || p_table || '_after_delete ' ||
       'after delete on ' || p_table || ' for each row ' ||
       'execute procedure ps_' || p_table || '_delete_trigger()';
        execute l_sql;
     end if;
  end if;

  execute 'drop table if exists ' || p_snapshot;
end;
$$ language plpgsql;


Здесь тоже нет ничего принципиально нового и единственное, о чем хотелось бы заметить, это то, что, в случае использования агрегатов 'min' или 'max', при создании триггеров, используется функция ps_TABLE_raise_trigger(), запрещающая удаления и изменения в таблице, по которой построен snapshot. Это сделано потому, что я не смог придумать адекватную по производительности реализацию обновления этих агрегатов при выполнении операторов update и delete в исходной таблице.

Посмотрим, как все это работает. Создадим тестовую таблицу:

create sequence test_seq;

create table test (
  id            bigint         default nextval('test_seq') not null,
  event_time    timestamp      not null,
  customer_id   bigint         not null,
  value         bigint         not null,
  primary key(id)
);

Теперь, для добавления секции, достаточно выполнить следующий запрос:

select ps_add_range_partition('test', 'event_time', 'month', to_date('20130501', 'YYYYMMDD'))

В результате, будет создана унаследованная таблица test_20130501, в которую будут автоматически попадать все записи за май месяц.

Для удаления секции, можно выполнить следующий запрос:

select ps_del_range_partition('test', to_date('20130501', 'YYYYMMDD'))


Создание snapshot несколько сложнее, поскольку предварительно требуется определить интересующие нас столбцы:

select ps_add_snapshot_column('test_month', 'customer_id', 'key')
select ps_add_snapshot_column('test_month', 'event_time', 'date')
select ps_add_snapshot_column('test_month', 'value_sum', 'value', 'sum')
select ps_add_snapshot_column('test_month', 'value_cnt', 'value', 'cnt')
select ps_add_snapshot_column('test_month', 'value_max', 'value', 'max')
select ps_add_snapshot('test', 'test_month', 'month')


В результате, будет создана автоматически обновляемая таблица, на основании следующего запроса:

select customer_id, date_trunc('month', event_time),
       sum(value) as value_sum,
       count(value) as value_cnt,
       max(value) as value_max
from   test
group by customer_id, date_trunc('month', event_time)

Удалить snapshot, можно выполнив следующий запрос:

select ps_del_snapshot('test_month')

На этом, на сегодня, все. Скрипты можно забрать на GitHub.
Теги:
Хабы:
Всего голосов 8: ↑7 и ↓1+6
Комментарии16

Публикации

Истории

Ближайшие события

15 – 16 ноября
IT-конференция Merge Skolkovo
Москва
22 – 24 ноября
Хакатон «AgroCode Hack Genetics'24»
Онлайн
28 ноября
Конференция «TechRec: ITHR CAMPUS»
МоскваОнлайн
25 – 26 апреля
IT-конференция Merge Tatarstan 2025
Казань