Pull to refresh

PostgreSQL: Уникальные ключи для распределенной базы. Практика

Reading time5 min
Views9.8K
По следам статьи «Уникальный ключ в условиях распределенной БД».

У нас есть база которую мы хотим разделить. В идеальном случае хочется сделать master-master. Один из самых сложных моментов, это обеспечение уникальности ключей на всех серверах. И хорошо если база изначально проектировалась с учетом масштабирования… Опять же, это что-то из области идеала, который встречается, скажем так — не часто.

Итак у нас есть база которую нужно подготовить к синхронизации master-master — сделаем все ключи в нашей базе уникальными в пределах проекта.

В упомянутой статье рассматривались несколько вариантов, но мы остановимся на одном предложенным Instagram

Шаг 1 — Перевод всех ключей в bigint


Здесь подразумевается, что все наши primary ключи называются id, и соответственно поля которые ссылаются на эти ключи названы подобным образом: order_id, client_id, table_id…

Создадим функцию которая переводит поле integer в bigint
DROP FUNCTION IF EXISTS "field_int2big" (field text, tablename text, table_schema text);
CREATE OR REPLACE FUNCTION "field_int2big" (field text, tablename text, table_schema text) RETURNS bigint AS 
$body$  
  DECLARE 
  BEGIN 
  EXECUTE 'ALTER TABLE '|| table_schema || '."'|| tablename || '" ALTER COLUMN "'|| field || '" TYPE bigint;' ;
  return 1;
  END;  
$body$  LANGUAGE 'plpgsql';


Дальше выбираем все integer поля и конвертирум их:
select *, field_int2big(column_name, table_name, table_schema)  from 
(select table_catalog, table_schema, table_name, column_name, data_type
 from information_schema.columns where table_schema in ('public', 'myscheme') 
 and data_type in ('integer', 'oid') and (position('id' in column_name)>0 OR column_name in ('key', 'myfield'))
order by table_catalog, table_schema, table_name, column_name 
limit 10 offset 0) c

Несколько вещей на которые нужно обратить внимание:
  1. Вы можете/должны добавить свои схемы: table_schema in ('public', 'myscheme')
  2. Также можете добавлять свои поля именованные не «стандартно»: column_name in ('key', 'myfield')
  3. Обратите внимание на limit 10 для больших баз таблиц нужно его уменьшать вплоть до 1 — смена типа требует времени и не малого
  4. Запрос нужно запускать несколько раз, каждый раз он будет находить оставшиеся не переведенные поля


Шаг 2 — Перевод функциональных индексов в которых есть прямое указание типа


Вообще если этого не сделать — принесет проблем в будущем, их крайне трудно обнаружить: выдает ошибку которую не видно в исполняемом запросе.

DROP FUNCTION IF EXISTS "index_int2big" (idx text, declare_idx text);
CREATE OR REPLACE FUNCTION "index_int2big" (idx text, declare_idx text) RETURNS text AS 
$body$  
  DECLARE 
  new_idx text;
  BEGIN 
  EXECUTE 'DROP INDEX IF EXISTS ' || idx;
  SELECT replace(declare_idx, 'integer', 'bigint') INTO new_idx;
  EXECUTE new_idx ;
  return new_idx;
  END;  
$body$  LANGUAGE 'plpgsql';

select *, index_int2big(indname, inddef) from 
(SELECT n.nspname as table_schema, c.relname as table_name, c2.relname AS indname, i.indisprimary, 
i.indisunique, i.indisclustered, pg_catalog.pg_get_indexdef(i.indexrelid, 0, true) AS inddef
FROM pg_catalog.pg_class c, pg_catalog.pg_class c2, pg_catalog.pg_index i, pg_namespace n
WHERE n.oid=c.relnamespace and c.oid = i.indrelid AND i.indexrelid = c2.oid
and n.nspname in ('bucardo', 'public')
and position('integer' in pg_catalog.pg_get_indexdef(i.indexrelid, 0, true))>0
limit 10 offset 0) c

Вы можете заметить что я проверяю в схеме bucardo. Если у вас уже работает синхронизация на основе этой технологии, тогда этот шаг стает крайне важным. Также смотрите замечание с прошлого шага.

Шаг 3 — Создание новых последовательностей для всех ключевых полей


В предложенном Instagram варианте используется уникальное число для каждой схемы/сервера.
т.е. необходимо иметь в каждой схеме свою функцию с своим уникальным номером.
Я немного изменил функцию и генерирую уникальный ключ по IP сервера.

CREATE OR REPLACE FUNCTION inet2num(inet) RETURNS numeric AS $$ 
DECLARE 
    a  text[] := string_to_array(host($1), '.'); 
BEGIN 
    RETURN a[1]::numeric * 16777216 + 
           a[2]::numeric * 65536 + 
           a[3]::numeric * 256 + 
           a[4]::numeric; 
END; 
$$ LANGUAGE plpgsql IMMUTABLE STRICT; 


DROP FUNCTION IF EXISTS next_id(tbl text, tableschema text);
CREATE OR REPLACE FUNCTION next_id(tbl text, tableschema text = 'public') returns bigint AS $$
DECLARE
    our_epoch bigint := 1314220021721;
    seq_id bigint;
    now_millis bigint;
    shard_id bigint;
    result bigint;
BEGIN
    SELECT nextval(tableschema||'."' || tbl || '_id_seq"') % 1024 INTO seq_id;
    /* select substring(regexp_replace(md5(current_database()||inet_server_addr()||version()), '[^\\\d]+', '', 'g')::text from 1 for 6)::int into shard_id;*/
    SELECT inet2num(inet_server_addr()) into shard_id;
    SELECT FLOOR(EXTRACT(EPOCH FROM clock_timestamp()) * 1000) INTO now_millis;
    result := (now_millis - our_epoch) << 23;
    result := result | (shard_id << 10);
    result := result | (seq_id);
    RETURN result;
END;
$$ LANGUAGE PLPGSQL;


Теперь нам осталось вызвать ее на всех наших идентификаторах:
DROP FUNCTION IF EXISTS "reset_nextid" (tablename text, tableschema text);
CREATE OR REPLACE FUNCTION "reset_nextid" (tablename text, tableschema text) RETURNS bigint AS 
$body$  
  DECLARE 
    id_type text;
  BEGIN 
  SELECT data_type from information_schema.columns c where "table_schema"=tableschema and "table_name"=tablename and column_name='id' INTO id_type;
  IF id_type <> 'bigint' THEN
    EXECUTE 'ALTER TABLE '|| tableschema || '."'|| tablename || '" ALTER COLUMN id TYPE bigint;' ;
  END IF;

  EXECUTE 'ALTER TABLE '|| tableschema || '."'|| tablename || '" ALTER COLUMN id SET DEFAULT next_id('''|| tablename || ''', '''|| tableschema || ''');';
  return next_id(tablename, tableschema);
  END;  
$body$  LANGUAGE 'plpgsql';

select t.*, reset_nextid(table_name, table_schema) from (
  select t.table_schema, t.table_name, sequence_name, c.column_name from information_schema.sequences s
  left join information_schema.tables t on split_part(sequence_name, '_id_seq',1)=table_name
  left join information_schema.columns c on (t.table_schema=c.table_schema and t.table_name=c.table_name and c.column_name='id')
  where c.column_name is not null and position('next_id' in c.column_default)<>1 and s.sequence_schema=t.table_schema 
  and t.table_schema in ('public', 'acc') 
  order by t.table_schema, t.table_name limit 50 offset 0
) as t


Т.к. мы уже сменили тип идентификатора на bigint — запрос должен отработать быстро.

На этом подготовку закончили, наша база работает и готова работать параллельно.

Бонус.

Если что то пошло не так с next_id, можете вернутся на стандартные последовательности:
CREATE OR REPLACE FUNCTION "restore_nextval" (tablename text, tableschema text = 'public') RETURNS bigint AS 
$body$  
  DECLARE 
  BEGIN 
  EXECUTE 'ALTER TABLE '|| tableschema || '."'|| tablename || '" ALTER COLUMN id SET DEFAULT nextval('''|| tablename || '_id_seq''::regclass);';
  return nextval((tableschema || '."'|| tablename || '_id_seq"')::regclass);
  END;  
$body$  LANGUAGE 'plpgsql';

select t.*, restore_nextval(table_name, table_schema) from (
  select t.table_schema, t.table_name, sequence_name, c.column_name from information_schema.sequences s
  left join information_schema.tables t on split_part(sequence_name, '_id_seq',1)=table_name
  left join information_schema.columns c on (t.table_schema=c.table_schema and t.table_name=c.table_name and c.column_name='id')
  where c.column_name is not null and position('next_id' in c.column_default)>0 and s.sequence_schema=t.table_schema 
  and t.table_schema in ('public', 'acc') 


Спасибо. Надеюсь кому-то было полезно.

P.S. Не пытайтесь это делать на 32 битном сервере. Вначале обновите сервер. И сервер приложений тоже.
Tags:
Hubs:
+20
Comments13

Articles