Хотя тема секционирования уже поднималась ранее, я хочу к ней вернуться, чтобы рассказать о своем опыте решения этой задачи, возникшей в связи с необходимостью аналитической обработкой больших объемов данных. Помимо секционирования, я рассмотрю предельно упрощенную реализацию «снимков» агрегированных запросов, автоматически обновляемых при изменении исходных данных..

Одним из главных требований, к разрабатываемой системе, было использование бесплатного ПО, в связи с чем, выбор пал на PostgreSQL. На момент начала работы над проектом, я довольно поверхностно знал PostgreSQL, но был неплохо знаком с возможностями Oracle Database. Поскольку речь шла об аналитической обработке, мне хотелось иметь аналоги таких опций Oracle как Partitioning и Materialized Views. После ознакомления с возможностями PostgreSQL, стало понятно, что этот функционал, так или иначе, придется писать вручную.

Разумеется, речь не шла о какой либо полноценной реализации Materialized Views, предусматривающей переписывание запросов. Для моих нужд вполне хватало возможности создания автоматически обновляемых агрегированных одно-табличных выборок (поддержка соединения таблиц, скорее всего, будет добавлена в ближайшем будущем). Для секционирования, я планировал использовать многократно описанный подход с использованием наследуемых таблиц, со вставкой данных, управляемой триггером. У меня была мысль использовать для управления секционированием Rules, но я от нее отказался, поскольку, в моем случае, преобладала вставка данных одиночными записями.

Начал я, разумеется, с таблиц для хранения метаданных:

ps_tables.sql
create sequence ps_table_seq;

create table    ps_table (
  id            bigint         default nextval('ps_table_seq') not null,
  name          varchar(50)    not null unique,
  primary key(id)
);

create sequence ps_column_seq;

create table    ps_column (
  id            bigint         default nextval('ps_column_seq') not null,
  table_id      bigint         not null references ps_table(id),
  name          varchar(50)    not null,
  parent_name   varchar(50),
  type_name     varchar(8)     not null check (type_name in ('date', 'key', 'nullable', 'sum', 'min', 'max', 'cnt')),
  unique (table_id, name),
  primary key(id)
);

create table    ps_range_partition (
  table_id      bigint         not null references ps_table(id),
  type_name     varchar(10)    not null check (type_name in ('day', 'week', 'month', 'year')),
  start_value   date           not null,
  end_value     date           not null,
  primary key(table_id, start_value)
);

create table    ps_snapshot (
  snapshot_id   bigint         not null references ps_table(id),
  table_id      bigint         not null references ps_table(id),
  type_name     varchar(10)    not null check (type_name in ('day', 'week', 'month', 'year')),
  primary key(snapshot_id)
);


Здесь все достаточно очевидно. Единственное, о чем стоит сказать, это типы столбцов:
Тип
Описание
date
Столбец, содержащий календарную дату, используемый при секционировании и агрегации данных (поддерживаются типы date и timestamp PostgreSQL)
key
Ключ, используемый в фразе group by, при агрегации данных (поддерживаются все целочисленные типы PostgreSQL)
nullable
Ключ, используемый при агрегации данных, возможно содержащий значение null
sum
Суммирование значений
min
Минимальное значени��
max
Максимальное значение
cnt
Подсчет количества не null-значений

Основой всего решения стала функция, выполняющая перестроение функций триггеров для таблицы, содержащей исходные данные:

ps_trigger_regenerate(bigint)
create or replace function ps_trigger_regenerate(in p_table bigint) returns void
as $$
declare
  l_sql         text;
  l_table_name  varchar(50);
  l_date_column varchar(50);
  l_flag        boolean;
  tabs          record;
  columns       record;
begin
  select name into l_table_name
  from   ps_table where id = p_table;

  l_sql := 
 'create or replace function ps_' || l_table_name || '_insert_trigger() returns trigger ' ||
 'as $'|| '$ ' ||
 'begin ';
  for tabs in
    select a.snapshot_id as id,
           b.name as table_name,
           a.type_name as snapshot_type
    from   ps_snapshot a, ps_table b
    where  a.table_id = p_table
    and    b.id = a.snapshot_id
    loop
      l_flag = FALSE;
      l_sql := l_sql ||
     'update ' || tabs.table_name || ' set ';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    not type_name in ('date', 'key', 'nullable')
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'sum' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' + coalesce(NEW.' || columns.parent_name || ', 0) ';
          end if;
          if columns.type_name = 'min' then
             l_sql := l_sql ||
             columns.name || ' = least(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
          end if;
          if columns.type_name = 'max' then
             l_sql := l_sql ||
             columns.name || ' = greatest(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
          end if;
          if columns.type_name = 'cnt' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';
          end if;
        end loop;
      l_flag = FALSE;
      l_sql := l_sql || 'where ';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('date', 'key', 'nullable')
        loop
          if l_flag then
             l_sql := l_sql || 'and ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'date' then
             l_sql := l_sql ||
             columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
          end if;
          if columns.type_name = 'key' then
             l_sql := l_sql ||
             columns.name || ' = NEW.' || columns.parent_name || ' ';
          end if;
          if columns.type_name = 'nullable' then
             l_sql := l_sql ||
             columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
          end if;
        end loop;
      l_sql := l_sql || '; ' ||
     'if not FOUND then ' ||
     'insert into ' || tabs.table_name || '(';
      l_flag = FALSE;
      for columns in
        select name, type_name
        from   ps_column
        where  table_id = tabs.id
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          l_sql := l_sql || columns.name;
        end loop;
      l_sql := l_sql || ') values (';
      l_flag = FALSE;
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'date' then
             l_sql := l_sql || 'date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ')';
          elsif columns.type_name = 'cnt' then
             l_sql := l_sql || 'case when NEW.' || columns.parent_name || ' is null then 0 else 1 end';
          elsif columns.type_name in ('nullable', 'sum') then
             l_sql := l_sql || 'coalesce(NEW.' || columns.parent_name || ', 0)';
          else
             l_sql := l_sql || 'NEW.' || columns.parent_name;
          end if;
        end loop;
      l_sql := l_sql || '); ' ||
     'end if; ';
    end loop;
    select name into l_date_column
    from   ps_column
    where  table_id = p_table
    and    type_name = 'date';
    for tabs in
      select to_char(start_value, 'YYYYMMDD') as start_value,
             to_char(end_value, 'YYYYMMDD') as end_value,
             type_name
      from   ps_range_partition
      where  table_id = p_table
      order  by start_value desc
      loop
        l_sql := l_sql ||
       'if NEW.' || l_date_column || ' >= to_date(''' || tabs.start_value || ''', ''YYYYMMDD'') and NEW.' || l_date_column || ' < to_date(''' || tabs.end_value || ''', ''YYYYMMDD'') then ' ||
          'insert into ' || l_table_name || '_' || tabs.start_value || ' values (NEW.*); ' ||
          'return null; ' ||
       'end if; ';
      end loop;
  l_sql := l_sql ||
 'return NEW; '||
 'end; '||
 '$'||'$ language plpgsql';
  execute l_sql;

  l_sql := 
 'create or replace function ps_' || l_table_name || '_raise_trigger() returns trigger ' ||
 'as $'|| '$ ' ||
 'begin ' ||
   'raise EXCEPTION ''Can''''t support % on MIN or MAX aggregate'', TG_OP;' ||
 'end; '||
 '$'||'$ language plpgsql';
  execute l_sql;

  l_sql := 
 'create or replace function ps_' || l_table_name || '_delete_trigger() returns trigger ' ||
 'as $'|| '$ ' ||
 'begin ';
  for tabs in
    select a.snapshot_id as id,
           b.name as table_name,
           a.type_name as snapshot_type
    from   ps_snapshot a, ps_table b
    where  a.table_id = p_table
    and    b.id = a.snapshot_id
    loop
      l_flag = FALSE;
      l_sql := l_sql ||
     'update ' || tabs.table_name || ' set ';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('sum', 'cnt')
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'sum' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' ';
          end if;
          if columns.type_name = 'cnt' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ';
          end if;
        end loop;
      l_flag = FALSE;
      l_sql := l_sql || 'where ';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('date', 'key', 'nullable')
        loop
          if l_flag  then
             l_sql := l_sql || 'and ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'date' then
             l_sql := l_sql ||
             columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
          end if;
          if columns.type_name = 'key' then
             l_sql := l_sql ||
             columns.name || ' = NEW.' || columns.parent_name || ' ';
          end if;
          if columns.type_name = 'nullable' then
             l_sql := l_sql ||
             columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
          end if;
        end loop;
      l_sql := l_sql || '; ';
    end loop;
  l_sql := l_sql ||
 'return null; '||
 'end; '||
 '$'||'$ language plpgsql';
  execute l_sql;

  l_sql := 
 'create or replace function ps_' || l_table_name || '_update_trigger() returns trigger ' ||
 'as $'|| '$ ' ||
 'begin ';
  for tabs in
    select a.snapshot_id as id,
           b.name as table_name,
           a.type_name as snapshot_type
    from   ps_snapshot a, ps_table b
    where  a.table_id = p_table
    and    b.id = a.snapshot_id
    loop
      l_flag = FALSE;
      l_sql := l_sql ||
     'update ' || tabs.table_name || ' set ';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('sum', 'cnt')
        loop
          if l_flag then
             l_sql := l_sql || ', ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'sum' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' + NEW.' || columns.parent_name || ' ';
          end if;
          if columns.type_name = 'cnt' then
             l_sql := l_sql ||
             columns.name || ' = ' || columns.name ||
             ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ' ||
             ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';
          end if;
        end loop;
      l_flag = FALSE;
      l_sql := l_sql || 'where ';
      for columns in
        select name, parent_name, type_name
        from   ps_column
        where  table_id = tabs.id
        and    type_name in ('date', 'key', 'nullable')
        loop
          if l_flag then
             l_sql := l_sql || 'and ';
          end if;
          l_flag := TRUE;
          if columns.type_name = 'date' then
             l_sql := l_sql ||
             columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
          end if;
          if columns.type_name = 'key' then
             l_sql := l_sql ||
             columns.name || ' = NEW.' || columns.parent_name || ' ';
          end if;
          if columns.type_name = 'nullable' then
             l_sql := l_sql ||
             columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
          end if;
        end loop;
      l_sql := l_sql || '; ';
    end loop;
  l_sql := l_sql ||
 'return null; '||
 'end; '||
 '$'||'$ language plpgsql';
  execute l_sql;
end;
$$ language plpgsql;


Несмотря на свой устрашающий вид, эта функция довольно проста. Ее задача — сформировать (на основе имеющихся метаданных), четыре функции, используемых при построении триггеров:

  • ps_TABLE_insert_trigger() — Функция управляющая вставкой данных
  • ps_TABLE_update_trigger() — Функция управляющая обновлением данных
  • ps_TABLE_delete_trigger() — Функция управляющая удалением данных
  • ps_TABLE_raise_trigger() — Функция запрещающая обновление и удаление данных

Здесь, вместо TABLE подставляется имя таблицы, содержащей исходные данные. Типичное определение функции ps_TABLE_insert_trigger() будет выглядеть следующим образом:

create or replace function ps_data_insert_trigger() returns trigger
as $$
begin
  update data_month set
    sum_field = sum_field + NEW.sum_field
  , min_field = least(min_field, NEW.min_field)
  where date_field = date_trunc('month', NEW.date_field)
  and   key_field = NEW.key_field;
  if not FOUND then
     insert into data_month(date_field, key_field, sum_field, min_field)
     values (date_trunc('month', NEW.date_field), NEW.key_field, NEW.sum_field, NEW.min_field);
  end if;
  if NEW.date_field >= to_date('20130101', 'YYYYMMDD') and 
     NEW.date_field < to_date('20130201', 'YYYYMMDD') then
     insert into data_20130101 values (NEW.*);
     return null;
  end if;
  return NEW;
end;
$$ language plpgsql;

На самом деле, функция выглядит несколько сложнее, поскольку особым образом обрабатываются null-значения. Но, в качестве иллюстрации, приведенный выше пример вполне адекватен. Логика этого кода очевидна:

  • При вставке в исходную таблицу data, пытаемся обновить счетчики в агрегированном представлении data_month
  • Если это не удалось (запись в data_month не найдена), добавляем новую запись
  • Далее, проверяем попадание в интервал дат для каждой секции (в примере одна секция), и при успехе, вставляем запись в соответствующую секцию (поскольку секция наследуется от главной таблицы, можно смело использовать звездочку) и возращаем null, чтобы предотвратить вставку записи в главную таблицу
  • Если ни одна из секций не подходит, возвращаем NEW, позволяя выполнить вставку в главную таблицу


Последний пункт приводит к тому, что если подходящая секция не найдена, данные добавляются в главную таблицу. На практике это довольно удобно. Даже если мы не создадим секцию заранее или получим данные с некорректной датой, вставка данных пройдет успешно. Впоследствии можно проанализировать содержимое главной таблицы, выполнив запрос:

select * from only data

После чего, создать недостающие секции (как будет показано ниже, данные будут автоматически перенесены из главной таблицы в созданную секцию). В подобных случаях, количество записей, не попавших в свою секцию, как правило, не велико и издержки, на перенос данных, незначительны.

Теперь осталось сделать обвязку. Начнем с функции создания новой секции:

ps_add_range_partition(varchar, varchar, varchar, date)
create or replace function ps_add_range_partition(in p_table varchar, in p_column varchar, 
                in p_type varchar, in p_start date) returns void
as $$
declare
  l_sql       text;
  l_end       date;
  l_start_str varchar(10);
  l_end_str   varchar(10);
  l_table     bigint;
  l_flag      boolean;
  columns     record;
begin
  perform 1
  from   ps_table a, ps_column b
  where  a.id = b.table_id and lower(a.name) = lower(p_table)
  and    b.type_name = 'date' and lower(b.name) <> lower(p_column);
  if FOUND then
     raise EXCEPTION 'Conflict DATE columns';
  end if;

  l_end := p_start + ('1 ' || p_type)::INTERVAL;

  perform 1
  from   ps_table a, ps_range_partition b
  where  a.id = b.table_id and lower(a.name) = lower(p_table)
  and (( p_start >= b.start_value and p_start < b.end_value ) or
       ( b.start_value >= p_start and b.start_value < l_end ));
  if FOUND then
     raise EXCEPTION 'Range intervals intersects';
  end if;

  perform 1
  from   ps_table
  where  lower(name) = lower(p_table);
  if not FOUND then
     insert into ps_table(name) values (lower(p_table));
  end if;

  select id into l_table
  from   ps_table
  where  lower(name) = lower(p_table);

  perform 1
  from   ps_column
  where  table_id = l_table and type_name = 'date'
  and    lower(name) = lower(p_column);
  if not FOUND then
     insert into ps_column(table_id, name, type_name)
     values (l_table, lower(p_column), 'date');
  end if;

  insert into ps_range_partition(table_id, type_name, start_value, end_value)
  values (l_table, p_type, p_start, l_end);

  l_start_str = to_char(p_start, 'YYYYMMDD');
  l_end_str = to_char(l_end, 'YYYYMMDD');

  l_sql :=
 'create table ' || p_table || '_' || l_start_str || '(' ||
   'check (' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||
                p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')), ' ||
   'primary key (';

    l_flag := FALSE;
    for columns in
      select f.name as name
      from ( select ps_array_to_set(a.conkey) as nn
             from   pg_constraint a, pg_class b
             where  b.oid = a.conrelid
             and    a.contype = 'p'
             and    b.relname = p_table ) c, 
           ( select d.attname as name, d.attnum as nn
             from   pg_attribute d, pg_class e
             where  e.oid = d.attrelid
             and    e.relname = p_table ) f
      where  f.nn = c.nn
      order  by f.nn
      loop
        if l_flag then
           l_sql := l_sql || ', ';
        end if;
        l_flag := TRUE;
        l_sql := l_sql || columns.name;
      end loop;

  l_sql := l_sql ||
 ')) inherits (' || p_table || ')';
  execute l_sql;

  l_sql := 
 'create index ' || p_table || '_' || l_start_str || '_date on ' || p_table || '_' || l_start_str || '(' || p_column || ')';
  execute l_sql;

  perform ps_trigger_regenerate(l_table);

  execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
  execute 'drop trigger if exists ps_' || p_table || '_after_update on '  || p_table;
  execute 'drop trigger if exists ps_' || p_table || '_after_delete on '  || p_table;

  l_sql := 
 'insert into ' || p_table || '_' || l_start_str || ' ' ||
 'select * from ' || p_table || ' where ' ||
  p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||
  p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';
  execute l_sql;

  l_sql := 
 'delete from only ' || p_table || ' where ' ||
  p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||
  p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';
  execute l_sql;

  l_sql := 
 'create trigger ps_' || p_table || '_before_insert ' ||
 'before insert on ' || p_table || ' for each row ' ||
 'execute procedure ps_' || p_table || '_insert_trigger()';
  execute l_sql;
  perform 1
  from   ps_snapshot a, ps_column b
  where  b.table_id = a.snapshot_id and a.table_id = l_table
  and    b.type_name in ('min', 'max');
  if FOUND then
     l_sql := 
    'create trigger ps_' || p_table || '_after_update ' ||
    'after update on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_after_delete ' ||
    'after delete on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||
    'after update on ' || p_table || '_' || l_start_str || ' for each row ' ||
    'execute procedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||
    'after delete on ' || p_table || '_' || l_start_str || ' for each row ' ||
    'execute procedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
  else
     l_sql := 
    'create trigger ps_' || p_table || '_after_update ' ||
    'after update on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_update_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_after_delete ' ||
    'after delete on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_delete_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||
    'after update on ' || p_table || '_' || l_start_str || ' for each row ' ||
    'execute procedure ps_' || p_table || '_update_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||
    'after delete on ' || p_table || '_' || l_start_str || ' for each row ' ||
    'execute procedure ps_' || p_table || '_delete_trigger()';
     execute l_sql;
  end if;
end;
$$ language plpgsql;


Здесь, после проверки корректности входных данных, мы добавляем необходимые метаданные, после чего, создаем унаследованную таблицу. Затем, мы пересоздаем функции триггеров вызовом ps_trigger_regenerate, после чего переносим данные, подпадающие под условие секционирования в созданную секцию динамическим запросом и пересоздаем сами триггеры.

Сложности возникли с двумя моментами.

  1. Пришлось немного помучиться с прибавлением к стартовой дате месяца, дня или года (в зависимости от входного параметра p_type:

    l_end := p_start + ('1 ' || p_type)::INTERVAL;
    

  2. Поскольку первичный ключ не наследуется, пришлось сочинять запрос к System Catalogs, для получения списка колонок первичного ключа исходной таблицы (хранить в своих метаданных еще и описание первичного ключа я счел нецелесообразным):

          select f.name as name
          from ( select ps_array_to_set(a.conkey) as nn
                 from   pg_constraint a, pg_class b
                 where  b.oid = a.conrelid
                 and    a.contype = 'p'
                 and    b.relname = p_table ) c, 
               ( select d.attname as name, d.attnum as nn
                 from   pg_attribute d, pg_class e
                 where  e.oid = d.attrelid
                 and    e.relname = p_table ) f
          where  f.nn = c.nn
          order  by f.nn
    


Также, следует отметить, что перед созданием индекса, на ключ секционирования (для с��зданной секции), стоило бы предварительно проверить, не является ли он лидирующим столбцом первичного ключа (чтобы не создавать дублирующий индекс).

Функция удаления секции существенно проще и в особых комментариях не нуждается:

ps_del_range_partition(varchar, date)
create or replace function ps_del_range_partition(in p_table varchar, in p_start date) 
      returns void
as $$
declare
  l_sql       text;
  l_start_str varchar(10);
  l_table     bigint;
begin
  select id into l_table
  from   ps_table
  where  lower(name) = lower(p_table);

  l_start_str = to_char(p_start, 'YYYYMMDD');

  delete from ps_range_partition 
  where  table_id = l_table
  and    start_value = p_start;

  perform ps_trigger_regenerate(l_table);

  l_sql := 
 'insert into ' || p_table || ' ' ||
 'select * from ' || p_table || '_' || l_start_str;
  execute l_sql;

  perform 1
  from ( select 1
         from   ps_range_partition
         where  table_id = l_table
         union  all
         select 1
         from   ps_snapshot
         where  table_id = l_table ) a;
  if not FOUND then
     execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
     execute 'drop trigger if exists ps_' || p_table || '_after_update on '  || p_table;
     execute 'drop trigger if exists ps_' || p_table || '_after_delete on '  || p_table;

     execute 'drop function ps_' || p_table || '_insert_trigger() cascade';
     execute 'drop function ps_' || p_table || '_raise_trigger()  cascade';
     execute 'drop function ps_' || p_table || '_update_trigger() cascade';
     execute 'drop function ps_' || p_table || '_delete_trigger() cascade';

     delete from ps_column where table_id = l_table;
     delete from ps_table where id = l_table;
  end if;

  perform 1
  from   ps_range_partition
  where  table_id = l_table;
  if not FOUND then
     delete from ps_column 
     where  table_id = l_table
     and    type_name = 'date';
  end if;

  execute 'drop table ' || p_table || '_' || l_start_str;
end;
$$ language plpgsql;


При удалении секции, данные, естественно, не теряются, а переносятся в главную таблицу (предварительно удаляются триггеры, поскольку, как выяснилось, ключевое слово only не работает в операторе insert).

Осталось добавить функции управления «живыми» снимками данных:

ps_add_snapshot_column(varchar, varchar, varchar, varchar)
create or replace function ps_add_snapshot_column(in p_snapshot varchar, 
     in p_column varchar, in p_parent varchar, in p_type varchar) returns void
as $$
declare
  l_table bigint;
begin
  perform 1
  from   ps_table
  where  lower(name) = lower(p_snapshot);
  if not FOUND then
     insert into ps_table(name) values (lower(p_snapshot));
  end if;

  select id into l_table
  from   ps_table
  where  lower(name) = lower(p_snapshot);

  insert into ps_column(table_id, name, parent_name, type_name)
  values (l_table, lower(p_column), lower(p_parent), p_type);
end;
$$ language plpgsql;


ps_add_snapshot(varchar, varchar, varchar)
create or replace function ps_add_snapshot(in p_table varchar, in p_snapshot varchar, 
     in p_type varchar) returns void
as $$
declare
  l_sql      text;
  l_table    bigint;
  l_snapshot bigint;
  l_flag     boolean;
  columns    record;
begin
  select id into l_snapshot
  from   ps_table
  where  lower(name) = lower(p_snapshot);

  perform 1
  from   ps_column
  where  table_id = l_snapshot
  and    type_name in ('date', 'key');
  if not FOUND then
     raise EXCEPTION 'Key columns not found';
  end if;

  perform 1
  from   ps_column
  where  table_id = l_snapshot
  and    not type_name in ('date', 'key', 'nullable');
  if not FOUND then
     raise EXCEPTION 'Aggregate columns not found';
  end if;

  perform 1
  from   ps_table
  where  lower(name) = lower(p_table);
  if not FOUND then
     insert into ps_table(name) values (lower(p_table));
  end if;

  select id into l_table
  from   ps_table
  where  lower(name) = lower(p_table);

  insert into ps_snapshot(table_id, snapshot_id, type_name)
  values (l_table, l_snapshot, p_type);

  perform ps_trigger_regenerate(l_table);

  l_sql := 'create table ' || p_snapshot || ' (';
  l_flag := FALSE;
  for columns in
    select name, type_name
    from   ps_column
    where  table_id = l_snapshot
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      if columns.type_name = 'date' then
         l_sql := l_sql || columns.name || ' date not null';
      else
         l_sql := l_sql || columns.name || ' bigint not null';
      end if;
    end loop;
  l_sql := l_sql || ', primary key (';
  l_flag := FALSE;
  for columns in
    select name
    from   ps_column
    where  table_id = l_snapshot
    and    type_name in ('date', 'key', 'nullable')
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      l_sql := l_sql || columns.name;
    end loop;
  l_sql := l_sql || '))';
  execute l_sql;

  execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
  execute 'drop trigger if exists ps_' || p_table || '_after_update on '  || p_table;
  execute 'drop trigger if exists ps_' || p_table || '_after_delete on '  || p_table;

  l_sql := 
 'create trigger ps_' || p_table || '_before_insert ' ||
 'before insert on ' || p_table || ' for each row ' ||
 'execute procedure ps_' || p_table || '_insert_trigger()';
  execute l_sql;

  perform 1
  from   ps_snapshot a, ps_column b
  where  b.table_id = a.snapshot_id and a.table_id = l_table
  and    b.type_name in ('min', 'max');
  if FOUND then
     l_sql := 
    'create trigger ps_' || p_table || '_after_update ' ||
    'after update on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_after_delete ' ||
    'after delete on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_raise_trigger()';
     execute l_sql;
  else
     l_sql := 
    'create trigger ps_' || p_table || '_after_update ' ||
    'after update on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_update_trigger()';
     execute l_sql;
     l_sql := 
    'create trigger ps_' || p_table || '_after_delete ' ||
    'after delete on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_delete_trigger()';
     execute l_sql;
  end if;

  l_sql := 'insert into ' || p_snapshot || '(';
  l_flag := FALSE;
  for columns in
    select name
    from   ps_column
    where  table_id = l_snapshot
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      l_sql := l_sql || columns.name;
    end loop;
  l_sql := l_sql || ') select ';
  l_flag := FALSE;
  for columns in
    select parent_name as name, type_name
    from   ps_column
    where  table_id = l_snapshot
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      if columns.type_name = 'date' then
         l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';
      end if;
      if columns.type_name = 'key' then
         l_sql := l_sql || columns.name;
      end if;
      if columns.type_name = 'nullable' then
         l_sql := l_sql || 'coalesce(' || columns.name || ', 0)';
      end if;
      if columns.type_name = 'sum' then
         l_sql := l_sql || 'sum(' || columns.name || ')';
      end if;
      if columns.type_name = 'min' then
         l_sql := l_sql || 'min(' || columns.name || ')';
      end if;
      if columns.type_name = 'max' then
         l_sql := l_sql || 'max(' || columns.name || ')';
      end if;
      if columns.type_name = 'cnt' then
         l_sql := l_sql || 'count(' || columns.name || ')';
      end if;
    end loop;
  l_sql := l_sql || 'from ' || p_table || ' group by ';
  l_flag := FALSE;
  for columns in
    select parent_name as name, type_name
    from   ps_column
    where  table_id = l_snapshot
    and    type_name in ('date', 'key', 'nullable')
    loop
      if l_flag then
         l_sql := l_sql || ', ';
      end if;
      l_flag := TRUE;
      if columns.type_name = 'date' then
         l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';
      else
         l_sql := l_sql || columns.name;
      end if;
    end loop;
  execute l_sql;
end;
$$ language plpgsql;


ps_del_snapshot(varchar)
create or replace function ps_del_snapshot(in p_snapshot varchar) returns void
as $$
declare
  l_sql      text;
  p_table    varchar(50);
  l_table    bigint;
  l_snapshot bigint;
begin
  select a.table_id, c.name into l_table, p_table
  from   ps_snapshot a, ps_table b, ps_table c
  where  b.id = a.snapshot_id and c.id = a.table_id
  and    lower(b.name) = lower(p_snapshot);

  select id into l_snapshot
  from   ps_table
  where  lower(name) = lower(p_snapshot);

  delete from ps_snapshot where snapshot_id = l_snapshot;
  delete from ps_column where table_id = l_snapshot;
  delete from ps_table where id = l_snapshot;

  execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
  execute 'drop trigger if exists ps_' || p_table || '_after_update  on ' || p_table;
  execute 'drop trigger if exists ps_' || p_table || '_after_delete  on ' || p_table;
  
  perform 1
  from ( select 1
         from   ps_range_partition
         where  table_id = l_table
         union  all
         select 1
         from   ps_snapshot
         where  table_id = l_table ) a;
  if not FOUND then
     execute 'drop function if exists ps_' || p_table || '_insert_trigger() cascade';
     execute 'drop function if exists ps_' || p_table || '_raise_trigger()  cascade';
     execute 'drop function if exists ps_' || p_table || '_update_trigger() cascade';
     execute 'drop function if exists ps_' || p_table || '_delete_trigger() cascade';
  else
     perform ps_trigger_regenerate(l_table);

     l_sql := 
    'create trigger ps_' || p_table || '_before_insert ' ||
    'before insert on ' || p_table || ' for each row ' ||
    'execute procedure ps_' || p_table || '_insert_trigger()';
     execute l_sql;

     perform 1
     from   ps_snapshot a, ps_column b
     where  b.table_id = a.snapshot_id and a.table_id = l_table
     and    b.type_name in ('min', 'max');
     if FOUND then
        l_sql := 
       'create trigger ps_' || p_table || '_after_update ' ||
       'after update on ' || p_table || ' for each row ' ||
       'execute procedure ps_' || p_table || '_raise_trigger()';
        execute l_sql;
        l_sql := 
       'create trigger ps_' || p_table || '_after_delete ' ||
       'after delete on ' || p_table || ' for each row ' ||
       'execute procedure ps_' || p_table || '_raise_trigger()';
        execute l_sql;
     else
        l_sql := 
       'create trigger ps_' || p_table || '_after_update ' ||
       'after update on ' || p_table || ' for each row ' ||
       'execute procedure ps_' || p_table || '_update_trigger()';
        execute l_sql;
        l_sql := 
       'create trigger ps_' || p_table || '_after_delete ' ||
       'after delete on ' || p_table || ' for each row ' ||
       'execute procedure ps_' || p_table || '_delete_trigger()';
        execute l_sql;
     end if;
  end if;

  execute 'drop table if exists ' || p_snapshot;
end;
$$ language plpgsql;


Здесь тоже нет ничего принципиально нового и единственное, о чем хотелось бы заметить, это то, что, в случае использ��вания агрегатов 'min' или 'max', при создании триггеров, используется функция ps_TABLE_raise_trigger(), запрещающая удаления и изменения в таблице, по которой построен snapshot. Это сделано потому, что я не смог придумать адекватную по производительности реализацию обновления этих агрегатов при выполнении операторов update и delete в исходной таблице.

Посмотрим, как все это работает. Создадим тестовую таблицу:

create sequence test_seq;

create table test (
  id            bigint         default nextval('test_seq') not null,
  event_time    timestamp      not null,
  customer_id   bigint         not null,
  value         bigint         not null,
  primary key(id)
);

Теперь, для добавления секции, достаточно выполнить следующий запрос:

select ps_add_range_partition('test', 'event_time', 'month', to_date('20130501', 'YYYYMMDD'))

В результате, будет создана унаследованная таблица test_20130501, в которую будут автоматически попадать все записи за май месяц.

Для удаления секции, можно выполнить следующий запрос:

select ps_del_range_partition('test', to_date('20130501', 'YYYYMMDD'))


Создание snapshot несколько сложнее, поскольку предварительно требуется определить интересующие нас столбцы:

select ps_add_snapshot_column('test_month', 'customer_id', 'key')
select ps_add_snapshot_column('test_month', 'event_time', 'date')
select ps_add_snapshot_column('test_month', 'value_sum', 'value', 'sum')
select ps_add_snapshot_column('test_month', 'value_cnt', 'value', 'cnt')
select ps_add_snapshot_column('test_month', 'value_max', 'value', 'max')
select ps_add_snapshot('test', 'test_month', 'month')


В результате, будет создана автоматически обновляемая таблица, на основании следующего запроса:

select customer_id, date_trunc('month', event_time),
       sum(value) as value_sum,
       count(value) as value_cnt,
       max(value) as value_max
from   test
group by customer_id, date_trunc('month', event_time)

Удалить snapshot, можно выполнив следующий запрос:

select ps_del_snapshot('test_month')

На этом, на сегодня, все. Скрипты можно забрать на GitHub.