PostgreSQL предлагает метод логического декодирования и делает возможным сбор данных об изменениях на основе логирования. Вы сможете настроить и запустить CDC в несколько шагов.
Архитектура современных веб-приложений состоит из нескольких программных компонентов, таких как информационные панели (дашборды), аналитические системы, базы данных, озёра данных (Data Lakes), кэшевые хранилища, функции поиска и т.д.
База данных обычно является основной частью любого приложения. Обновление данных в режиме реального времени позволяет поддерживать разрозненные системы данных в непрерывной синхронизации и быстро реагировать на появление новой информации. Как же поддерживать экосистему приложений в синхронном состоянии? Как эти компоненты получают информацию об изменениях в базе данных? Термин отслеживание изменённых данных, или сокращённо CDC, — относится к любому решению, которое идентифицирует новые или изменённые данные.
Статья посвящена отслеживанию изменённых данных (CDC) в PostgreSQL и способам достижения этой цели.
Отслеживание изменённых данных (CDC) — это метод интеграции данных для обнаружения, захвата и передачи изменений, внесённых в источники данных базы данных.Как правило, интеграция данных на основе CDC состоит из следующих шагов:
- Захват изменённых данных в исходной базе данных.
- Преобразование изменённых данных в формат, который могут принять ваши потребители (консьюмеры).
- Публикация данных для консьюмеров или целевой базы данных.
PostgreSQL предлагает два встроенных способа сделать CDC возможным:
- Из журналов транзакций, PostgreSQL WALs (они же Write Ahead Logs).
- С помощью триггеров базы данных.
Давайте кратко обсудим плюсы и минусы использования журналов транзакций (WALs) и триггеров для отслеживания изменения данных.
Триггеры
Методы на основе триггеров предполагают создание триггеров аудита в базе данных для регистрации всех событий, связанных с методами INSERT, UPDATE и DELETE.
Триггеры могут быть привязаны к таблицам (разделённым и нет) или представлениям (views).
Они также могут срабатывать для операторов TRUNCATE. При возникновении события триггера — функция вызывается в соответствующее время для обработки события.
- ? Главное преимущество этого метода заключается в том, что всё это можно сделать на уровне SQL, в отличие от журналов транзакций.
- ? Однако использование триггеров оказывает значительное влияние на производительность исходной базы данных, поскольку эти триггеры должны запускаться в базе данных приложения при внесении изменений в данные.
Журналы транзакций
В большинстве современных СУБД журналы транзакций (WAL для PostgreSQL) обычно используются для логирования и дублирования (репликации) транзакций.
В PostgreSQL все транзакции, такие как INSERT, UPDATE, DELETE, записываются в WAL до того, как клиент получает результат транзакции.
- Преимущество этого подхода в том, что он никак не влияет на производительность базы данных.
- Он также не требует модификации таблиц БД или приложения. Нет необходимости создавать дополнительные таблицы в исходной базе данных.
- CDC на основе журнала обычно считается лучшим подходом к отслеживанию изменённых данных, применимым ко всем возможным сценариям, включая системы с чрезвычайно высокими объёмами транзакций.
Учтите, что в настоящее время большинство операторов DDL, таких как CREATE, DROP, ALTER, не отслеживаются. Однако команда TRUNCATE в потоке логической репликации присутствует.Если вам нужна потоковая передача изменений данных Postgres по мере их возникновения, вам понадобится функция логического декодирования или логической репликации Postgres.
Применение логического декодирования Postgres
Логическое декодирование — это официальное название основанной на логировании PostgreSQL CDC логической репликации.
Логическое декодирование использует содержимое журнала PostgreSQL Write-Ahead Log для хранения всех действий, происходящих в базе данных. Журнал Write Ahead Log — это внутренний журнал, который описывает изменения базы данных на уровне хранилища.
1. Первым шагом в использовании логического декодирования является установка следующих параметров в конфигурации Postgres
postgresql.conf
:wal_level = logical
max_replication_slots = 5
max_wal_senders = 10
- Установка
wal_level
в значениеlogical
позволяет WAL записывать информацию, необходимую для логического декодирования. - Убедитесь, что значение параметра
max_replication_slots
равно или больше количества коннекторов PostgreSQL, использующих WAL, и прибавьте к этому количество других слотов репликации, используемых вашей базой данных. - Убедитесь, что параметр
max_wal_senders
, определяющий максимальное количество одновременных соединений с WAL, как минимум вдвое превышает количество логических слотов репликации. Например, если ваша база данных использует в общей сложности 5 слотов репликации, значение параметраmax_wal_senders
должно быть 10 или больше.
Перезапустите ваш сервер Postgres, чтобы применить изменения.
2. Второй шаг заключается в настройке логической репликации с помощью подключаемого модуля
test_decoding
Создайте слот логической репликации для базы данных, которую вы хотите синхронизировать, выполнив следующую команду:
SELECT pg_create_logical_replication_slot('replication_slot', 'test_decoding');
Примечание: Каждый слот репликации имеет имя, которое может содержать строчные буквы, цифры и символ подчёркивания.
Чтобы убедиться в том, что слот был успешно создан, выполните следующую команду:
SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
3. В следующем шаге создайте публикацию для всех ваших таблиц или только для тех, что вам нужны. Если вы зададите конкретные таблицы, вы сможете добавить или удалить их из публикации позже.
CREATE PUBLICATION pub FOR ALL TABLES;
или
CREATE PUBLICATION pub FOR TABLE table1, table2, table3;
По желанию вы можете выбрать, какие операции включить в публикацию. Например, следующая публикация включает для первой таблицы (
table1
) только операции INSERT и UPDATE.CREATE PUBLICATION insert_update_only_pub FOR TABLE table1 WITH (publish = 'INSERT, UPDATE');
4. Убедитесь, что выбранные вами таблицы есть в публикации.
psql-stream=# SELECT * FROM pg_publication_tables WHERE pubname='pub';
Output
pubname | schemaname | tablename
---------+------------+-----------
pub | public | table1
pub | public | table2
pub | public | table3
(3 rows)
С этого момента наша публикация
pub
будет отслеживать изменения всех таблиц в базе данных psql-stream
.5. Давайте создадим условную таблицу
t
и заполним её несколькими записями:create table t (id int, name text);
INSERT INTO t(id, name) SELECT g.id, k.name FROM generate_series(1, 10) as g(id), substr(md5(random()::text), 0, 25) as k(name);
В результате мы получим 10 записей в таблице
t
.psql-stream=# SELECT count(*) FROM t;
count
-------
10
(1 row)
6. Наконец, пришло время проверить, работает ли наша логическая репликация.
Выполните следующую команду в консоли PostgreSQL, чтобы увидеть записи Postgres WAL:
SELECT * FROM pg_logical_slot_get_changes('replication_slot', NULL, NULL);
В результате вы получите что-то вроде:
lsn | xid | data
-----------+------+--------------------------------------------------------
0/19EA2C0 | 1045 | BEGIN 1045
0/19EA2C0 | 1045 | table public.t: INSERT: id[integer]:1 name[text]:51459cbc211647e7b31c8720
0/19EA300 | 1045 | table public.t: INSERT: id[integer]:2 name[text]:51459cbc211647e7b31c8720
0/19EA340 | 1045 | table public.t: INSERT: id[integer]:3 name[text]:51459cbc211647e7b31c8720
0/19EA380 | 1045 | table public.t: INSERT: id[integer]:4 name[text]:51459cbc211647e7b31c8720
0/19EA3C0 | 1045 | table public.t: INSERT: id[integer]:5 name[text]:51459cbc211647e7b31c8720
0/19EA400 | 1045 | table public.t: INSERT: id[integer]:6 name[text]:51459cbc211647e7b31c8720
0/19EA440 | 1045 | table public.t: INSERT: id[integer]:7 name[text]:51459cbc211647e7b31c8720
0/19EA480 | 1045 | table public.t: INSERT: id[integer]:8 name[text]:51459cbc211647e7b31c8720
0/19EA4C0 | 1045 | table public.t: INSERT: id[integer]:9 name[text]:51459cbc211647e7b31c8720
0/19EA500 | 1045 | table public.t: INSERT: id[integer]:10 name[text]:51459cbc211647e7b31c8720
0/19EA5B0 | 1045 | COMMIT 1045
(13 rows)
pg_logical_slot_peek_changes
— это ещё одна команда PostgreSQL для просмотра изменений из записей WAL без их поглощения. Поэтому многократный вызов команды pg_logical_slot_peek_changes
будет возвращать один и тот же результат.В свою очередь,
pg_logical_slot_get_changes
возвращает результаты только при первом вызове. Последующие вызовы pg_logical_slot_get_changes
возвращают пустые наборы результатов. Это означает, что при выполнении команды get
результаты обрабатываются и удаляются, что значительно расширяет наши возможности по написанию логики использования этих событий для создания реплики таблицы.7. Не забудьте избавиться от слота, который вам больше не нужен, чтобы остановить его поглощение.
SELECT pg_drop_replication_slot('replication_slot');
Подключаемые модули
Мы уже говорили о подключаемом модуле
test_decoding
, доступном на Postgres 9.4+. Хотя он и был создан как пример подключаемого модуля, он всё ещё полезен, если ваш консьюмер поддерживает его.Наряду с плагином
test_decoding
, с PostgreSQL поставляется ещё один плагин — pgoutput
. pgoutput
доступен начиная с Postgres 10. Некоторые потребители поддерживают его для декодирования (например, Debezium).Выполните следующую команду, чтобы создать плагин на основе
pgoutput
, как в шаге 2 выше.SELECT * FROM pg_create_logical_replication_slot('replication_slot', 'pgoutput');
Следующая команда поглощает измененённые данные, аналогичные описанным в шаге 6.
psql-stream=# SELECT * FROM pg_logical_slot_peek_binary_changes('replication_slot', null, null, 'proto_version', '1', 'publication_names', 'pub');
lsn | xid | data
-----------+------+------------------------------------------------------------------------------------------
0/19A15F8 | 1038 | \x4200000000019a1d9000027de20a91a0ea0000040e
0/19A15F8 | 1038 | \x52000080387075626c69630074006400020169640000000017ffffffff006e616d650000000019ffffffff
0/19A15F8 | 1038 | \x49000080384e0002740000000234306e
0/19A1890 | 1038 | \x49000080384e0002740000000234316e
0/19A1910 | 1038 | \x49000080384e0002740000000234326e
0/19A1990 | 1038 | \x49000080384e0002740000000234336e
0/19A1A10 | 1038 | \x49000080384e0002740000000234346e
0/19A1A90 | 1038 | \x49000080384e0002740000000234356e
0/19A1B10 | 1038 | \x49000080384e0002740000000234366e
0/19A1B90 | 1038 | \x49000080384e0002740000000234376e
0/19A1C10 | 1038 | \x49000080384e0002740000000234386e
0/19A1C90 | 1038 | \x49000080384e0002740000000234396e
0/19A1DC0 | 1038 | \x430000000000019a1d9000000000019a1dc000027de20a91a0ea
(13 rows)
Здесь можно заметить, что результаты возвращаются в двоичном формате. Плагин
pgoutput
производит двоичный вывод.wal2json
— ещё один популярный плагин вывода для логического декодирования.Вот пример вывода данных из плагина
wal2json
:{
"change":[
{
"kind":"insert",
"schema":"public",
"table":"t",
"columnnames":[
"id",
"name"
],
"columntypes":[
"integer",
"character varying(255)"
],
"columnvalues":[
1,
""
]
}
]
}
{
"change":[
{
"kind":"update",
"schema":"public",
"table":"t",
"columnnames":[
"id",
"name"
],
"columntypes":[
"integer",
"character varying(255)"
],
"columnvalues":[
1,
"New Value"
],
"oldkeys":{
"keynames":[
"id"
],
"keytypes":[
"integer"
],
"keyvalues":[
1
]
}
}
]
}
{
"change":[
{
"kind":"delete",
"schema":"public",
"table":"t",
"oldkeys":{
"keynames":[
"id"
],
"keytypes":[
"integer"
],
"keyvalues":[
1
]
}
}
]
}
Важные рекомендации по работе со слотами
При работе со слотами помните о следующем:
- Каждый слот может иметь только один подключаемый модуль (вы сами выбираете, какой именно).
- Каждый слот предоставляет изменения только из одной базы данных.
- Одна база данных может иметь несколько слотов.
- Каждое изменение данных обычно выдаётся один раз для каждого слота.
- Однако слот может повторно выдать изменения при перезапуске инстанса Postgres (консьюмер должен справиться с этой задачей).
- Непоглощенный слот — это угроза доступности инстанса Postgres. Postgres будет сохранять все WAL-файлы для этих непоглощенных изменений. Это может привести к переполнению хранилища.
Консьюмеры PostgreSQL WAL
Консьюмер — это любое приложение, которое может получить поток логического декодирования Postgres. pg_recvlogical — это приложение PostgreSQL, которое может управлять слотами и поглощать из них поток. Оно включено в дистрибутив Postgres, поэтому, скорее всего, оно уже будет установлено у вас вместе с PostgreSQL.
Фото: Markus Spiske / Unsplash
Образец программы на Golang
Следующий образец программы на Golang показывает, как приступить к созданию собственного консьюмера Postgress WAL. Он использует логическую репликацию PostgreSQL-10.x для потоковой передачи изменений базы данных (декодированных сообщений WAL) из исходной базы данных.
package main
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"time"
"github.com/jackc/pgconn"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgproto3/v2"
)
// Обратите внимание, что параметр выполнения "replication=database" в строке подключения является обязательным
// слот репликации не будет создан, если опустить значение replication=database
const CONN = "postgres://postgres:postgres@localhost/psql-streamer?replication=database"
const SLOT_NAME = "replication_slot"
const OUTPUT_PLUGIN = "pgoutput"
const INSERT_TEMPLATE = "create table t (id int, name text);"
var Event = struct {
Relation string
Columns []string
}{}
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
conn, err := pgconn.Connect(ctx, CONN)
if err != nil {
panic(err)
}
defer conn.Close(ctx)
// 1. Создайте таблицу
if _, err := conn.Exec(ctx, INSERT_TEMPLATE).ReadAll(); err != nil {
fmt.Errorf("failed to create table: %v", err)
}
// 2. Убедитесь, что публикация существует
if _, err := conn.Exec(ctx, "DROP PUBLICATION IF EXISTS pub;").ReadAll(); err != nil {
fmt.Errorf("failed to drop publication: %v", err)
}
if _, err := conn.Exec(ctx, "CREATE PUBLICATION pub FOR ALL TABLES;").ReadAll(); err != nil {
fmt.Errorf("failed to create publication: %v", err)
}
// 3. Создайте временный сервер репликации слотов
if _, err = pglogrepl.CreateReplicationSlot(ctx, conn, SLOT_NAME, OUTPUT_PLUGIN, pglogrepl.CreateReplicationSlotOptions{Temporary: true}); err != nil {
fmt.Errorf("failed to create a replication slot: %v", err)
}
var msgPointer pglogrepl.LSN
pluginArguments := []string{"proto_version '1'", "publication_names 'pub'"}
// 4. Установите связь
err = pglogrepl.StartReplication(ctx, conn, SLOT_NAME, msgPointer, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
if err != nil {
fmt.Errorf("failed to establish start replication: %v", err)
}
var pingTime time.Time
for ctx.Err() != context.Canceled {
if time.Now().After(pingTime) {
if err = pglogrepl.SendStandbyStatusUpdate(ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: msgPointer}); err != nil {
fmt.Errorf("failed to send standby update: %v", err)
}
pingTime = time.Now().Add(10 * time.Second)
//fmt.Println("client: please standby")
}
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
msg, err := conn.ReceiveMessage(ctx)
if pgconn.Timeout(err) {
continue
}
if err != nil {
fmt.Errorf("something went wrong while listening for message: %v", err)
}
switch msg := msg.(type) {
case *pgproto3.CopyData:
switch msg.Data[0] {
case pglogrepl.PrimaryKeepaliveMessageByteID:
// fmt.Println("server: confirmed standby")
case pglogrepl.XLogDataByteID:
walLog, err := pglogrepl.ParseXLogData(msg.Data[1:])
if err != nil {
fmt.Errorf("failed to parse logical WAL log: %v", err)
}
var msg pglogrepl.Message
if msg, err = pglogrepl.Parse(walLog.WALData); err != nil {
fmt.Errorf("failed to parse logical replication message: %v", err)
}
switch m := msg.(type) {
case *pglogrepl.RelationMessage:
Event.Columns = []string{}
for _, col := range m.Columns {
Event.Columns = append(Event.Columns, col.Name)
}
Event.Relation = m.RelationName
case *pglogrepl.InsertMessage:
var sb strings.Builder
sb.WriteString(fmt.Sprintf("INSERT %s(", Event.Relation))
for i := 0; i < len(Event.Columns); i++ {
sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.Tuple.Columns[i].Data)))
}
sb.WriteString(")")
fmt.Println(sb.String())
case *pglogrepl.UpdateMessage:
var sb strings.Builder
sb.WriteString(fmt.Sprintf("UPDATE %s(", Event.Relation))
for i := 0; i < len(Event.Columns); i++ {
sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.NewTuple.Columns[i].Data)))
}
sb.WriteString(")")
fmt.Println(sb.String())
case *pglogrepl.DeleteMessage:
var sb strings.Builder
sb.WriteString(fmt.Sprintf("DELETE %s(", Event.Relation))
for i := 0; i < len(Event.Columns); i++ {
sb.WriteString(fmt.Sprintf("%s: %s ", Event.Columns[i], string(m.OldTuple.Columns[i].Data)))
}
sb.WriteString(")")
fmt.Println(sb.String())
case *pglogrepl.TruncateMessage:
fmt.Println("ALL GONE (TRUNCATE)")
}
}
default:
fmt.Printf("received unexpected message: %T", msg)
}
}
}
Эта программа просто регистрирует входящие события, но в производственной среде вы можете легко отправить их в очередь сообщений или в целевую базу данных.
Заключение
Логическое декодирование в PostgreSQL обеспечивает эффективный способ для других компонентов приложения быть в курсе изменений данных в вашей базе данных Postgres.
Традиционно используется модель уведомлений pull, при которой каждый компонент приложения запрашивает Postgres через определённый интервал времени. Логическое кодирование использует модель push-уведомления, при которой Postgres уведомляет другие части приложения о каждом изменении, как только оно происходит.В настоящее время события изменения данных могут быть отправлены консьюмерам за миллисекунды без запроса к базе данных. Благодаря логическому декодированию, база данных PostgreSQL становится центральной частью вашего современного динамического приложения реального времени (RTA).
НЛО прилетело и оставило здесь промокод для читателей нашего блога:
— 15% на все тарифы VDS (кроме тарифа Прогрев) — HABRFIRSTVDS.