Хотя тема секционирования уже поднималась ранее, я хочу к ней вернуться, чтобы рассказать о своем опыте решения этой задачи, возникшей в связи с необходимостью аналитической обработкой больших объемов данных. Помимо секционирования, я рассмотрю предельно упрощенную реализацию «снимков» агрегированных запросов, автоматически обновляемых при изменении исходных данных..
Одним из главных требований, к разрабатываемой системе, было использование бесплатного ПО, в связи с чем, выбор пал на PostgreSQL. На момент начала работы над проектом, я довольно поверхностно знал PostgreSQL, но был неплохо знаком с возможностями Oracle Database. Поскольку речь шла об аналитической обработке, мне хотелось иметь аналоги таких опций Oracle как Partitioning и Materialized Views. После ознакомления с возможностями PostgreSQL, стало понятно, что этот функционал, так или иначе, придется писать вручную.
Разумеется, речь не шла о какой либо полноценной реализации Materialized Views, предусматривающей переписывание запросов. Для моих нужд вполне хватало возможности создания автоматически обновляемых агрегированных одно-табличных выборок (поддержка соединения таблиц, скорее всего, будет добавлена в ближайшем будущем). Для секционирования, я планировал использовать многократно описанный подход с использованием наследуемых таблиц, со вставкой данных, управляемой триггером. У меня была мысль использовать для управления секционированием Rules, но я от нее отказался, поскольку, в моем случае, преобладала вставка данных одиночными записями.
Начал я, разумеется, с таблиц для хранения метаданных:
Здесь все достаточно очевидно. Единственное, о чем стоит сказать, это типы столбцов:
Основой всего решения стала функция, выполняющая перестроение функций триггеров для таблицы, содержащей исходные данные:
Несмотря на свой устрашающий вид, эта функция довольно проста. Ее задача — сформировать (на основе имеющихся метаданных), четыре функции, используемых при построении триггеров:
Здесь, вместо TABLE подставляется имя таблицы, содержащей исходные данные. Типичное определение функции ps_TABLE_insert_trigger() будет выглядеть следующим образом:
На самом деле, функция выглядит несколько сложнее, поскольку особым образом обрабатываются null-значения. Но, в качестве иллюстрации, приведенный выше пример вполне адекватен. Логика этого кода очевидна:
Последний пункт приводит к тому, что если подходящая секция не найдена, данные добавляются в главную таблицу. На практике это довольно удобно. Даже если мы не создадим секцию заранее или получим данные с некорректной датой, вставка данных пройдет успешно. Впоследствии можно проанализировать содержимое главной таблицы, выполнив запрос:
После чего, создать недостающие секции (как будет показано ниже, данные будут автоматически перенесены из главной таблицы в созданную секцию). В подобных случаях, количество записей, не попавших в свою секцию, как правило, не велико и издержки, на перенос данных, незначительны.
Теперь осталось сделать обвязку. Начнем с функции создания новой секции:
Здесь, после проверки корректности входных данных, мы добавляем необходимые метаданные, после чего, создаем унаследованную таблицу. Затем, мы пересоздаем функции триггеров вызовом ps_trigger_regenerate, после чего переносим данные, подпадающие под условие секционирования в созданную секцию динамическим запросом и пересоздаем сами триггеры.
Сложности возникли с двумя моментами.
Также, следует отметить, что перед созданием индекса, на ключ секционирования (для с��зданной секции), стоило бы предварительно проверить, не является ли он лидирующим столбцом первичного ключа (чтобы не создавать дублирующий индекс).
Функция удаления секции существенно проще и в особых комментариях не нуждается:
При удалении секции, данные, естественно, не теряются, а переносятся в главную таблицу (предварительно удаляются триггеры, поскольку, как выяснилось, ключевое слово only не работает в операторе insert).
Осталось добавить функции управления «живыми» снимками данных:
Здесь тоже нет ничего принципиально нового и единственное, о чем хотелось бы заметить, это то, что, в случае использ��вания агрегатов 'min' или 'max', при создании триггеров, используется функция ps_TABLE_raise_trigger(), запрещающая удаления и изменения в таблице, по которой построен snapshot. Это сделано потому, что я не смог придумать адекватную по производительности реализацию обновления этих агрегатов при выполнении операторов update и delete в исходной таблице.
Посмотрим, как все это работает. Создадим тестовую таблицу:
Теперь, для добавления секции, достаточно выполнить следующий запрос:
В результате, будет создана унаследованная таблица test_20130501, в которую будут автоматически попадать все записи за май месяц.
Для удаления секции, можно выполнить следующий запрос:
Создание snapshot несколько сложнее, поскольку предварительно требуется определить интересующие нас столбцы:
В результате, будет создана автоматически обновляемая таблица, на основании следующего запроса:
Удалить snapshot, можно выполнив следующий запрос:
На этом, на сегодня, все. Скрипты можно забрать на GitHub.
Одним из главных требований, к разрабатываемой системе, было использование бесплатного ПО, в связи с чем, выбор пал на PostgreSQL. На момент начала работы над проектом, я довольно поверхностно знал PostgreSQL, но был неплохо знаком с возможностями Oracle Database. Поскольку речь шла об аналитической обработке, мне хотелось иметь аналоги таких опций Oracle как Partitioning и Materialized Views. После ознакомления с возможностями PostgreSQL, стало понятно, что этот функционал, так или иначе, придется писать вручную.
Разумеется, речь не шла о какой либо полноценной реализации Materialized Views, предусматривающей переписывание запросов. Для моих нужд вполне хватало возможности создания автоматически обновляемых агрегированных одно-табличных выборок (поддержка соединения таблиц, скорее всего, будет добавлена в ближайшем будущем). Для секционирования, я планировал использовать многократно описанный подход с использованием наследуемых таблиц, со вставкой данных, управляемой триггером. У меня была мысль использовать для управления секционированием Rules, но я от нее отказался, поскольку, в моем случае, преобладала вставка данных одиночными записями.
Начал я, разумеется, с таблиц для хранения метаданных:
ps_tables.sql
create sequence ps_table_seq;
create table ps_table (
id bigint default nextval('ps_table_seq') not null,
name varchar(50) not null unique,
primary key(id)
);
create sequence ps_column_seq;
create table ps_column (
id bigint default nextval('ps_column_seq') not null,
table_id bigint not null references ps_table(id),
name varchar(50) not null,
parent_name varchar(50),
type_name varchar(8) not null check (type_name in ('date', 'key', 'nullable', 'sum', 'min', 'max', 'cnt')),
unique (table_id, name),
primary key(id)
);
create table ps_range_partition (
table_id bigint not null references ps_table(id),
type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')),
start_value date not null,
end_value date not null,
primary key(table_id, start_value)
);
create table ps_snapshot (
snapshot_id bigint not null references ps_table(id),
table_id bigint not null references ps_table(id),
type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')),
primary key(snapshot_id)
);
Здесь все достаточно очевидно. Единственное, о чем стоит сказать, это типы столбцов:
| Тип |
Описание |
| date |
Столбец, содержащий календарную дату, используемый при секционировании и агрегации данных (поддерживаются типы date и timestamp PostgreSQL) |
| key |
Ключ, используемый в фразе group by, при агрегации данных (поддерживаются все целочисленные типы PostgreSQL) |
| nullable |
Ключ, используемый при агрегации данных, возможно содержащий значение null |
| sum |
Суммирование значений |
| min |
Минимальное значени�� |
| max |
Максимальное значение |
| cnt |
Подсчет количества не null-значений |
Основой всего решения стала функция, выполняющая перестроение функций триггеров для таблицы, содержащей исходные данные:
ps_trigger_regenerate(bigint)
create or replace function ps_trigger_regenerate(in p_table bigint) returns void
as $$
declare
l_sql text;
l_table_name varchar(50);
l_date_column varchar(50);
l_flag boolean;
tabs record;
columns record;
begin
select name into l_table_name
from ps_table where id = p_table;
l_sql :=
'create or replace function ps_' || l_table_name || '_insert_trigger() returns trigger ' ||
'as $'|| '$ ' ||
'begin ';
for tabs in
select a.snapshot_id as id,
b.name as table_name,
a.type_name as snapshot_type
from ps_snapshot a, ps_table b
where a.table_id = p_table
and b.id = a.snapshot_id
loop
l_flag = FALSE;
l_sql := l_sql ||
'update ' || tabs.table_name || ' set ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and not type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'sum' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' + coalesce(NEW.' || columns.parent_name || ', 0) ';
end if;
if columns.type_name = 'min' then
l_sql := l_sql ||
columns.name || ' = least(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
end if;
if columns.type_name = 'max' then
l_sql := l_sql ||
columns.name || ' = greatest(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) ';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';
end if;
end loop;
l_flag = FALSE;
l_sql := l_sql || 'where ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || 'and ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql ||
columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
end if;
if columns.type_name = 'key' then
l_sql := l_sql ||
columns.name || ' = NEW.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql ||
columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
end if;
end loop;
l_sql := l_sql || '; ' ||
'if not FOUND then ' ||
'insert into ' || tabs.table_name || '(';
l_flag = FALSE;
for columns in
select name, type_name
from ps_column
where table_id = tabs.id
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql || ') values (';
l_flag = FALSE;
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || 'date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ')';
elsif columns.type_name = 'cnt' then
l_sql := l_sql || 'case when NEW.' || columns.parent_name || ' is null then 0 else 1 end';
elsif columns.type_name in ('nullable', 'sum') then
l_sql := l_sql || 'coalesce(NEW.' || columns.parent_name || ', 0)';
else
l_sql := l_sql || 'NEW.' || columns.parent_name;
end if;
end loop;
l_sql := l_sql || '); ' ||
'end if; ';
end loop;
select name into l_date_column
from ps_column
where table_id = p_table
and type_name = 'date';
for tabs in
select to_char(start_value, 'YYYYMMDD') as start_value,
to_char(end_value, 'YYYYMMDD') as end_value,
type_name
from ps_range_partition
where table_id = p_table
order by start_value desc
loop
l_sql := l_sql ||
'if NEW.' || l_date_column || ' >= to_date(''' || tabs.start_value || ''', ''YYYYMMDD'') and NEW.' || l_date_column || ' < to_date(''' || tabs.end_value || ''', ''YYYYMMDD'') then ' ||
'insert into ' || l_table_name || '_' || tabs.start_value || ' values (NEW.*); ' ||
'return null; ' ||
'end if; ';
end loop;
l_sql := l_sql ||
'return NEW; '||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;
l_sql :=
'create or replace function ps_' || l_table_name || '_raise_trigger() returns trigger ' ||
'as $'|| '$ ' ||
'begin ' ||
'raise EXCEPTION ''Can''''t support % on MIN or MAX aggregate'', TG_OP;' ||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;
l_sql :=
'create or replace function ps_' || l_table_name || '_delete_trigger() returns trigger ' ||
'as $'|| '$ ' ||
'begin ';
for tabs in
select a.snapshot_id as id,
b.name as table_name,
a.type_name as snapshot_type
from ps_snapshot a, ps_table b
where a.table_id = p_table
and b.id = a.snapshot_id
loop
l_flag = FALSE;
l_sql := l_sql ||
'update ' || tabs.table_name || ' set ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('sum', 'cnt')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'sum' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ';
end if;
end loop;
l_flag = FALSE;
l_sql := l_sql || 'where ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || 'and ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql ||
columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
end if;
if columns.type_name = 'key' then
l_sql := l_sql ||
columns.name || ' = NEW.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql ||
columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
end if;
end loop;
l_sql := l_sql || '; ';
end loop;
l_sql := l_sql ||
'return null; '||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;
l_sql :=
'create or replace function ps_' || l_table_name || '_update_trigger() returns trigger ' ||
'as $'|| '$ ' ||
'begin ';
for tabs in
select a.snapshot_id as id,
b.name as table_name,
a.type_name as snapshot_type
from ps_snapshot a, ps_table b
where a.table_id = p_table
and b.id = a.snapshot_id
loop
l_flag = FALSE;
l_sql := l_sql ||
'update ' || tabs.table_name || ' set ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('sum', 'cnt')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'sum' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' + NEW.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql ||
columns.name || ' = ' || columns.name ||
' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ' ||
' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end ';
end if;
end loop;
l_flag = FALSE;
l_sql := l_sql || 'where ';
for columns in
select name, parent_name, type_name
from ps_column
where table_id = tabs.id
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || 'and ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql ||
columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') ';
end if;
if columns.type_name = 'key' then
l_sql := l_sql ||
columns.name || ' = NEW.' || columns.parent_name || ' ';
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql ||
columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)';
end if;
end loop;
l_sql := l_sql || '; ';
end loop;
l_sql := l_sql ||
'return null; '||
'end; '||
'$'||'$ language plpgsql';
execute l_sql;
end;
$$ language plpgsql;
Несмотря на свой устрашающий вид, эта функция довольно проста. Ее задача — сформировать (на основе имеющихся метаданных), четыре функции, используемых при построении триггеров:
- ps_TABLE_insert_trigger() — Функция управляющая вставкой данных
- ps_TABLE_update_trigger() — Функция управляющая обновлением данных
- ps_TABLE_delete_trigger() — Функция управляющая удалением данных
- ps_TABLE_raise_trigger() — Функция запрещающая обновление и удаление данных
Здесь, вместо TABLE подставляется имя таблицы, содержащей исходные данные. Типичное определение функции ps_TABLE_insert_trigger() будет выглядеть следующим образом:
create or replace function ps_data_insert_trigger() returns trigger
as $$
begin
update data_month set
sum_field = sum_field + NEW.sum_field
, min_field = least(min_field, NEW.min_field)
where date_field = date_trunc('month', NEW.date_field)
and key_field = NEW.key_field;
if not FOUND then
insert into data_month(date_field, key_field, sum_field, min_field)
values (date_trunc('month', NEW.date_field), NEW.key_field, NEW.sum_field, NEW.min_field);
end if;
if NEW.date_field >= to_date('20130101', 'YYYYMMDD') and
NEW.date_field < to_date('20130201', 'YYYYMMDD') then
insert into data_20130101 values (NEW.*);
return null;
end if;
return NEW;
end;
$$ language plpgsql;
На самом деле, функция выглядит несколько сложнее, поскольку особым образом обрабатываются null-значения. Но, в качестве иллюстрации, приведенный выше пример вполне адекватен. Логика этого кода очевидна:
- При вставке в исходную таблицу data, пытаемся обновить счетчики в агрегированном представлении data_month
- Если это не удалось (запись в data_month не найдена), добавляем новую запись
- Далее, проверяем попадание в интервал дат для каждой секции (в примере одна секция), и при успехе, вставляем запись в соответствующую секцию (поскольку секция наследуется от главной таблицы, можно смело использовать звездочку) и возращаем null, чтобы предотвратить вставку записи в главную таблицу
- Если ни одна из секций не подходит, возвращаем NEW, позволяя выполнить вставку в главную таблицу
Последний пункт приводит к тому, что если подходящая секция не найдена, данные добавляются в главную таблицу. На практике это довольно удобно. Даже если мы не создадим секцию заранее или получим данные с некорректной датой, вставка данных пройдет успешно. Впоследствии можно проанализировать содержимое главной таблицы, выполнив запрос:
select * from only data
После чего, создать недостающие секции (как будет показано ниже, данные будут автоматически перенесены из главной таблицы в созданную секцию). В подобных случаях, количество записей, не попавших в свою секцию, как правило, не велико и издержки, на перенос данных, незначительны.
Теперь осталось сделать обвязку. Начнем с функции создания новой секции:
ps_add_range_partition(varchar, varchar, varchar, date)
create or replace function ps_add_range_partition(in p_table varchar, in p_column varchar,
in p_type varchar, in p_start date) returns void
as $$
declare
l_sql text;
l_end date;
l_start_str varchar(10);
l_end_str varchar(10);
l_table bigint;
l_flag boolean;
columns record;
begin
perform 1
from ps_table a, ps_column b
where a.id = b.table_id and lower(a.name) = lower(p_table)
and b.type_name = 'date' and lower(b.name) <> lower(p_column);
if FOUND then
raise EXCEPTION 'Conflict DATE columns';
end if;
l_end := p_start + ('1 ' || p_type)::INTERVAL;
perform 1
from ps_table a, ps_range_partition b
where a.id = b.table_id and lower(a.name) = lower(p_table)
and (( p_start >= b.start_value and p_start < b.end_value ) or
( b.start_value >= p_start and b.start_value < l_end ));
if FOUND then
raise EXCEPTION 'Range intervals intersects';
end if;
perform 1
from ps_table
where lower(name) = lower(p_table);
if not FOUND then
insert into ps_table(name) values (lower(p_table));
end if;
select id into l_table
from ps_table
where lower(name) = lower(p_table);
perform 1
from ps_column
where table_id = l_table and type_name = 'date'
and lower(name) = lower(p_column);
if not FOUND then
insert into ps_column(table_id, name, type_name)
values (l_table, lower(p_column), 'date');
end if;
insert into ps_range_partition(table_id, type_name, start_value, end_value)
values (l_table, p_type, p_start, l_end);
l_start_str = to_char(p_start, 'YYYYMMDD');
l_end_str = to_char(l_end, 'YYYYMMDD');
l_sql :=
'create table ' || p_table || '_' || l_start_str || '(' ||
'check (' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||
p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')), ' ||
'primary key (';
l_flag := FALSE;
for columns in
select f.name as name
from ( select ps_array_to_set(a.conkey) as nn
from pg_constraint a, pg_class b
where b.oid = a.conrelid
and a.contype = 'p'
and b.relname = p_table ) c,
( select d.attname as name, d.attnum as nn
from pg_attribute d, pg_class e
where e.oid = d.attrelid
and e.relname = p_table ) f
where f.nn = c.nn
order by f.nn
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql ||
')) inherits (' || p_table || ')';
execute l_sql;
l_sql :=
'create index ' || p_table || '_' || l_start_str || '_date on ' || p_table || '_' || l_start_str || '(' || p_column || ')';
execute l_sql;
perform ps_trigger_regenerate(l_table);
execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table;
l_sql :=
'insert into ' || p_table || '_' || l_start_str || ' ' ||
'select * from ' || p_table || ' where ' ||
p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||
p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';
execute l_sql;
l_sql :=
'delete from only ' || p_table || ' where ' ||
p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' ||
p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_before_insert ' ||
'before insert on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_insert_trigger()';
execute l_sql;
perform 1
from ps_snapshot a, ps_column b
where b.table_id = a.snapshot_id and a.table_id = l_table
and b.type_name in ('min', 'max');
if FOUND then
l_sql :=
'create trigger ps_' || p_table || '_after_update ' ||
'after update on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_after_delete ' ||
'after delete on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||
'after update on ' || p_table || '_' || l_start_str || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||
'after delete on ' || p_table || '_' || l_start_str || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
else
l_sql :=
'create trigger ps_' || p_table || '_after_update ' ||
'after update on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_after_delete ' ||
'after delete on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' ||
'after update on ' || p_table || '_' || l_start_str || ' for each row ' ||
'execute procedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' ||
'after delete on ' || p_table || '_' || l_start_str || ' for each row ' ||
'execute procedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
end if;
end;
$$ language plpgsql;
Здесь, после проверки корректности входных данных, мы добавляем необходимые метаданные, после чего, создаем унаследованную таблицу. Затем, мы пересоздаем функции триггеров вызовом ps_trigger_regenerate, после чего переносим данные, подпадающие под условие секционирования в созданную секцию динамическим запросом и пересоздаем сами триггеры.
Сложности возникли с двумя моментами.
- Пришлось немного помучиться с прибавлением к стартовой дате месяца, дня или года (в зависимости от входного параметра p_type:
l_end := p_start + ('1 ' || p_type)::INTERVAL;
- Поскольку первичный ключ не наследуется, пришлось сочинять запрос к System Catalogs, для получения списка колонок первичного ключа исходной таблицы (хранить в своих метаданных еще и описание первичного ключа я счел нецелесообразным):
select f.name as name from ( select ps_array_to_set(a.conkey) as nn from pg_constraint a, pg_class b where b.oid = a.conrelid and a.contype = 'p' and b.relname = p_table ) c, ( select d.attname as name, d.attnum as nn from pg_attribute d, pg_class e where e.oid = d.attrelid and e.relname = p_table ) f where f.nn = c.nn order by f.nn
Также, следует отметить, что перед созданием индекса, на ключ секционирования (для с��зданной секции), стоило бы предварительно проверить, не является ли он лидирующим столбцом первичного ключа (чтобы не создавать дублирующий индекс).
Функция удаления секции существенно проще и в особых комментариях не нуждается:
ps_del_range_partition(varchar, date)
create or replace function ps_del_range_partition(in p_table varchar, in p_start date)
returns void
as $$
declare
l_sql text;
l_start_str varchar(10);
l_table bigint;
begin
select id into l_table
from ps_table
where lower(name) = lower(p_table);
l_start_str = to_char(p_start, 'YYYYMMDD');
delete from ps_range_partition
where table_id = l_table
and start_value = p_start;
perform ps_trigger_regenerate(l_table);
l_sql :=
'insert into ' || p_table || ' ' ||
'select * from ' || p_table || '_' || l_start_str;
execute l_sql;
perform 1
from ( select 1
from ps_range_partition
where table_id = l_table
union all
select 1
from ps_snapshot
where table_id = l_table ) a;
if not FOUND then
execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table;
execute 'drop function ps_' || p_table || '_insert_trigger() cascade';
execute 'drop function ps_' || p_table || '_raise_trigger() cascade';
execute 'drop function ps_' || p_table || '_update_trigger() cascade';
execute 'drop function ps_' || p_table || '_delete_trigger() cascade';
delete from ps_column where table_id = l_table;
delete from ps_table where id = l_table;
end if;
perform 1
from ps_range_partition
where table_id = l_table;
if not FOUND then
delete from ps_column
where table_id = l_table
and type_name = 'date';
end if;
execute 'drop table ' || p_table || '_' || l_start_str;
end;
$$ language plpgsql;
При удалении секции, данные, естественно, не теряются, а переносятся в главную таблицу (предварительно удаляются триггеры, поскольку, как выяснилось, ключевое слово only не работает в операторе insert).
Осталось добавить функции управления «живыми» снимками данных:
ps_add_snapshot_column(varchar, varchar, varchar, varchar)
create or replace function ps_add_snapshot_column(in p_snapshot varchar,
in p_column varchar, in p_parent varchar, in p_type varchar) returns void
as $$
declare
l_table bigint;
begin
perform 1
from ps_table
where lower(name) = lower(p_snapshot);
if not FOUND then
insert into ps_table(name) values (lower(p_snapshot));
end if;
select id into l_table
from ps_table
where lower(name) = lower(p_snapshot);
insert into ps_column(table_id, name, parent_name, type_name)
values (l_table, lower(p_column), lower(p_parent), p_type);
end;
$$ language plpgsql;
ps_add_snapshot(varchar, varchar, varchar)
create or replace function ps_add_snapshot(in p_table varchar, in p_snapshot varchar,
in p_type varchar) returns void
as $$
declare
l_sql text;
l_table bigint;
l_snapshot bigint;
l_flag boolean;
columns record;
begin
select id into l_snapshot
from ps_table
where lower(name) = lower(p_snapshot);
perform 1
from ps_column
where table_id = l_snapshot
and type_name in ('date', 'key');
if not FOUND then
raise EXCEPTION 'Key columns not found';
end if;
perform 1
from ps_column
where table_id = l_snapshot
and not type_name in ('date', 'key', 'nullable');
if not FOUND then
raise EXCEPTION 'Aggregate columns not found';
end if;
perform 1
from ps_table
where lower(name) = lower(p_table);
if not FOUND then
insert into ps_table(name) values (lower(p_table));
end if;
select id into l_table
from ps_table
where lower(name) = lower(p_table);
insert into ps_snapshot(table_id, snapshot_id, type_name)
values (l_table, l_snapshot, p_type);
perform ps_trigger_regenerate(l_table);
l_sql := 'create table ' || p_snapshot || ' (';
l_flag := FALSE;
for columns in
select name, type_name
from ps_column
where table_id = l_snapshot
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || columns.name || ' date not null';
else
l_sql := l_sql || columns.name || ' bigint not null';
end if;
end loop;
l_sql := l_sql || ', primary key (';
l_flag := FALSE;
for columns in
select name
from ps_column
where table_id = l_snapshot
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql || '))';
execute l_sql;
execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table;
l_sql :=
'create trigger ps_' || p_table || '_before_insert ' ||
'before insert on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_insert_trigger()';
execute l_sql;
perform 1
from ps_snapshot a, ps_column b
where b.table_id = a.snapshot_id and a.table_id = l_table
and b.type_name in ('min', 'max');
if FOUND then
l_sql :=
'create trigger ps_' || p_table || '_after_update ' ||
'after update on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_after_delete ' ||
'after delete on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
else
l_sql :=
'create trigger ps_' || p_table || '_after_update ' ||
'after update on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_after_delete ' ||
'after delete on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
end if;
l_sql := 'insert into ' || p_snapshot || '(';
l_flag := FALSE;
for columns in
select name
from ps_column
where table_id = l_snapshot
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
l_sql := l_sql || columns.name;
end loop;
l_sql := l_sql || ') select ';
l_flag := FALSE;
for columns in
select parent_name as name, type_name
from ps_column
where table_id = l_snapshot
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';
end if;
if columns.type_name = 'key' then
l_sql := l_sql || columns.name;
end if;
if columns.type_name = 'nullable' then
l_sql := l_sql || 'coalesce(' || columns.name || ', 0)';
end if;
if columns.type_name = 'sum' then
l_sql := l_sql || 'sum(' || columns.name || ')';
end if;
if columns.type_name = 'min' then
l_sql := l_sql || 'min(' || columns.name || ')';
end if;
if columns.type_name = 'max' then
l_sql := l_sql || 'max(' || columns.name || ')';
end if;
if columns.type_name = 'cnt' then
l_sql := l_sql || 'count(' || columns.name || ')';
end if;
end loop;
l_sql := l_sql || 'from ' || p_table || ' group by ';
l_flag := FALSE;
for columns in
select parent_name as name, type_name
from ps_column
where table_id = l_snapshot
and type_name in ('date', 'key', 'nullable')
loop
if l_flag then
l_sql := l_sql || ', ';
end if;
l_flag := TRUE;
if columns.type_name = 'date' then
l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')';
else
l_sql := l_sql || columns.name;
end if;
end loop;
execute l_sql;
end;
$$ language plpgsql;
ps_del_snapshot(varchar)
create or replace function ps_del_snapshot(in p_snapshot varchar) returns void
as $$
declare
l_sql text;
p_table varchar(50);
l_table bigint;
l_snapshot bigint;
begin
select a.table_id, c.name into l_table, p_table
from ps_snapshot a, ps_table b, ps_table c
where b.id = a.snapshot_id and c.id = a.table_id
and lower(b.name) = lower(p_snapshot);
select id into l_snapshot
from ps_table
where lower(name) = lower(p_snapshot);
delete from ps_snapshot where snapshot_id = l_snapshot;
delete from ps_column where table_id = l_snapshot;
delete from ps_table where id = l_snapshot;
execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table;
execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table;
perform 1
from ( select 1
from ps_range_partition
where table_id = l_table
union all
select 1
from ps_snapshot
where table_id = l_table ) a;
if not FOUND then
execute 'drop function if exists ps_' || p_table || '_insert_trigger() cascade';
execute 'drop function if exists ps_' || p_table || '_raise_trigger() cascade';
execute 'drop function if exists ps_' || p_table || '_update_trigger() cascade';
execute 'drop function if exists ps_' || p_table || '_delete_trigger() cascade';
else
perform ps_trigger_regenerate(l_table);
l_sql :=
'create trigger ps_' || p_table || '_before_insert ' ||
'before insert on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_insert_trigger()';
execute l_sql;
perform 1
from ps_snapshot a, ps_column b
where b.table_id = a.snapshot_id and a.table_id = l_table
and b.type_name in ('min', 'max');
if FOUND then
l_sql :=
'create trigger ps_' || p_table || '_after_update ' ||
'after update on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_after_delete ' ||
'after delete on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_raise_trigger()';
execute l_sql;
else
l_sql :=
'create trigger ps_' || p_table || '_after_update ' ||
'after update on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_update_trigger()';
execute l_sql;
l_sql :=
'create trigger ps_' || p_table || '_after_delete ' ||
'after delete on ' || p_table || ' for each row ' ||
'execute procedure ps_' || p_table || '_delete_trigger()';
execute l_sql;
end if;
end if;
execute 'drop table if exists ' || p_snapshot;
end;
$$ language plpgsql;
Здесь тоже нет ничего принципиально нового и единственное, о чем хотелось бы заметить, это то, что, в случае использ��вания агрегатов 'min' или 'max', при создании триггеров, используется функция ps_TABLE_raise_trigger(), запрещающая удаления и изменения в таблице, по которой построен snapshot. Это сделано потому, что я не смог придумать адекватную по производительности реализацию обновления этих агрегатов при выполнении операторов update и delete в исходной таблице.
Посмотрим, как все это работает. Создадим тестовую таблицу:
create sequence test_seq;
create table test (
id bigint default nextval('test_seq') not null,
event_time timestamp not null,
customer_id bigint not null,
value bigint not null,
primary key(id)
);
Теперь, для добавления секции, достаточно выполнить следующий запрос:
select ps_add_range_partition('test', 'event_time', 'month', to_date('20130501', 'YYYYMMDD'))
В результате, будет создана унаследованная таблица test_20130501, в которую будут автоматически попадать все записи за май месяц.
Для удаления секции, можно выполнить следующий запрос:
select ps_del_range_partition('test', to_date('20130501', 'YYYYMMDD'))
Создание snapshot несколько сложнее, поскольку предварительно требуется определить интересующие нас столбцы:
select ps_add_snapshot_column('test_month', 'customer_id', 'key')
select ps_add_snapshot_column('test_month', 'event_time', 'date')
select ps_add_snapshot_column('test_month', 'value_sum', 'value', 'sum')
select ps_add_snapshot_column('test_month', 'value_cnt', 'value', 'cnt')
select ps_add_snapshot_column('test_month', 'value_max', 'value', 'max')
select ps_add_snapshot('test', 'test_month', 'month')
В результате, будет создана автоматически обновляемая таблица, на основании следующего запроса:
select customer_id, date_trunc('month', event_time),
sum(value) as value_sum,
count(value) as value_cnt,
max(value) as value_max
from test
group by customer_id, date_trunc('month', event_time)
Удалить snapshot, можно выполнив следующий запрос:
select ps_del_snapshot('test_month')
На этом, на сегодня, все. Скрипты можно забрать на GitHub.
