Pull to refresh

Как получить все сообщения через логическую репликацию Postgres

Reading time7 min
Views3.2K
Original author: OSKAR DUDYCZ

В одной из предыдущих статей я описал Push-based Outbox Pattern (шаблон исходящих сообщений на основе push с логической репликацией Postgres). Идея заключается в том, чтобы сохранить исходящее сообщение (например, событие) в той же транзакции базы данных вместе с изменением состояния. Благодаря этому мы гарантируем, что сообщение не будет потеряно, а наш бизнес-процесс будет продолжаться и станет согласованным.

Postgres может помочь и проинформировать нас, когда добавляется новое сообщение. Мы можем использовать встроенный механизм журнала упреждающей записи (WAL, Write-Ahead Log) вместе с логической репликацией.

Журнал упреждающей записи — центральный элемент Postgres. Каждая вставка, обновление и удаление регистрируются в журнале в порядке появления, а затем применяются к таблицам при фиксации транзакции. Логическая репликация выводит традиционный подход на новый уровень. Вместо отправки необработанного двоичного потока резервных копий файлов базы данных мы отправляем поток изменений, которые были записаны в журнале упреждающей записи.

Журнал упреждающей записи — это эфемерная структура. Если мы не скажем базе данных хранить его дольше, записи могут быть удалены после успешной фиксации транзакции. 

Это также сделано для оптимизации дискового пространства. Когда мы создаем публикацию логической репликации, мы указываем Postgres сохранять записи WAL, так как мы хотели бы получать их через уведомления.

async Task CreatePublication(
    EventsSubscriptionOptions options,
    CancellationToken ct
)
{
    var (connectionString, _, publicationName, tableName) = options;
    await using var dataSource = NpgsqlDataSource.Create(connectionString);
    await dataSource.Execute(
      $"CREATE PUBLICATION {publicationName} FOR TABLE {tableName};", ct
    );
}

Но если мы создадим подписку:

async Task<CreateReplicationSlotResult> CreateSubscription(
    LogicalReplicationConnection connection,
    EventsSubscriptionOptions options,
    CancellationToken ct
)
{
    var result = await connection.CreatePgOutputReplicationSlot(
        options.SlotName,
        slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export,
        cancellationToken: ct
    );

    return new Created(options.TableName, result.SnapshotName!);
}

И подпишемся на уведомления:

public async IAsyncEnumerable<object> Subscribe(
    EventsSubscriptionOptions options,
    [EnumeratorCancellation] CancellationToken ct
)
{
    var (connectionString, slotName, publicationName, _) = options;
    await using var conn = new LogicalReplicationConnection(connectionString);
    await conn.Open(ct);

    var slot = new PgOutputReplicationSlot(slotName);

    await foreach (var message in conn.StartReplication(slot,
      new PgOutputReplicationOptions(publicationName, 1), ct)
    )
    {
        if (message is InsertMessage insertMessage)
        {
            yield return await InsertMessageHandler.Handle(insertMessage, ct);
        }

        conn.SetReplicationStatus(message.WalEnd);
        await conn.SendStatusUpdate(ct);
    }
}

Мы можем понять, что получили новые добавленные записи только после создания публикации. 

Это потому, что Postgres не знал раньше, что мы хотели бы сохранить записи WAL, и обрезал их. 

Это не является большой проблемой, если мы начинаем с нового развертывания или лучшего нового проекта. Однако, если бы у нас уже есть реализация исходящих сообщений на основе извлечения, мы можем захотеть получить старые сообщения. Как это сделать?

Давайте вернемся к настройке подписки:

var result = await connection.CreatePgOutputReplicationSlot(options.SlotName,
    slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export, cancellationToken: ct);

return new Created(options.TableName, result.SnapshotName!);

Он вызывает внутреннюю функцию CREATE_REPLICATION_SLOT:

CREATE_REPLICATION_SLOT 
    events_slot 
LOGICAL pgoutput(SNAPSHOT 'export')

Мы передаем немного загадочный параметр SNAPSHOT 'export'. Прежде чем я объясню его, давайте на мгновение остановимся и кратко обсудим, как работает транзакция Postgres.

Транзакция может содержать несколько операторов. С периодичностью, зависящей от уровня транзакции, Postgres создает моментальные снимки (snapshot). Snapshot — это замороженное состояние базы данных в определенный момент времени:

  • для READ COMMITED моментальный снимок создается после каждого зафиксированного оператора,

  • REPEATABLE READ и SERLIALIZABLE создают моментальный снимок в начале и поддерживают его согласованность на протяжении всей транзакции, даже если другие сеансы фиксируют транзакции.

Snapshot обычно хранится до тех пор, пока транзакция существует, а затем удаляется. Тем не менее, мы используем Postgres здесь. В этом должно быть что-то большее, верно? 

Если вы регулярно читаете мой блог, вы уже знаете функцию pg_current_snapshot. Она возвращает информацию о текущем моментальном снимке. У Postgres есть и другие подобные функции, например, pg_export_snapshot позволяет хранить моментальный снимок дольше, чем время жизни транзакции. 

Зачем нам это нужно? Например, для создания резервной копии базы данных, pg_dump использует ее для обеспечения отказоустойчивости. Мы же не хотим, чтобы данные резервной копии изменялись во время процесса, верно?

Функция экспорта снимков также используется при создании слота репликации. Если мы укажем параметр NAPSHOT 'export' при создании слота репликации, он автоматически создаст снимок и вернет его идентификатор. 

Мы можем использовать этот снимок для получения существующих данных на момент создания слота репликации. Все новые будут отправлены через логическую репликацию.

Чтобы прочитать существующие записи, нам нужно создать транзакцию как минимум с уровнем транзакции REPEATABLE READ и установить моментальный снимок транзакции на идентификатор, который мы получили на предыдущем шаге. Это заставит наши операции чтения получать доступ только к данным в момент времени, указанный в моментальном снимке.

На языке C# код может выглядеть следующим образом:

await using var transaction = await connection.BeginTransactionAsync(
  IsolationLevel.RepeatableRead, ct
);

await using var command = new NpgsqlCommand(
  $"SET TRANSACTION SNAPSHOT '{snapshotName}';", connection, transaction
);
await command.ExecuteScalarAsync(ct);

Затем мы можем опросить записи с помощью обычного оператора SELECT в таблице outbox. Метод будет выглядеть следующим образом.

public static async IAsyncEnumerable<object> QueryTransactionSnapshot(
    this NpgsqlConnection connection,
    string snapshotName,
    string tableName,
    Func<NpgsqlDataReader, CancellationToken, Task<object>> map,
    [EnumeratorCancellation] CancellationToken ct)
{
    await using var transaction = await connection.BeginTransactionAsync(
      IsolationLevel.RepeatableRead, ct
    );

    await using var command = new NpgsqlCommand(
      $"SET TRANSACTION SNAPSHOT '{snapshotName}';", connection, transaction
    );
    await command.ExecuteScalarAsync(ct);

    await using var cmd = new NpgsqlCommand(
      $"SELECT * FROM {tableName}", connection, transaction
    );
    await using var reader =  await cmd.ExecuteReaderAsync(ct);

    while (await reader.ReadAsync(ct))
    {
        yield return await map(reader, ct);
    }
}

Окончательный код для нашей подписки, который может выполнить полную настройку и читать данные моментального снимка, будет выглядеть следующим образом:

public interface IEventsSubscription
{
    IAsyncEnumerable<object> Subscribe(
      EventsSubscriptionOptions options, CancellationToken ct
    );
}

public class EventsSubscription: IEventsSubscription
{
    private async Task<CreateReplicationSlotResult> CreateSubscription(
        LogicalReplicationConnection connection,
        EventsSubscriptionOptions options,
        CancellationToken ct
    )
    {
        if (!await PublicationExists(options, ct))
            await CreatePublication(options, ct);

        if (await ReplicationSlotExists(options, ct))
            return new AlreadyExists();

        var result = await connection.CreatePgOutputReplicationSlot(options.SlotName,
            slotSnapshotInitMode: LogicalSlotSnapshotInitMode.Export, cancellationToken: ct);

        return new Created(options.TableName, result.SnapshotName!);
    }

    public async IAsyncEnumerable<object> Subscribe(
        EventsSubscriptionOptions options,
        [EnumeratorCancellation] CancellationToken ct
    )
    {
        var (connectionString, slotName, publicationName, _) = options;
        await using var conn = new LogicalReplicationConnection(connectionString);
        await conn.Open(ct);

        var result = await CreateSubscription(conn, options, ct);

        if (result is Created created)
        {
            await foreach (var @event in ReadExistingEventsFromSnapshot(
              created.SnapshotName, options, ct)
            )
            {
                yield return @event;
            }
        }

        var slot = new PgOutputReplicationSlot(slotName);

        await foreach (var message in conn.StartReplication(
          slot, new PgOutputReplicationOptions(publicationName, 1), ct)
        )
        {
            if (message is InsertMessage insertMessage)
            {
                yield return await InsertMessageHandler.Handle(insertMessage, ct);
            }

            conn.SetReplicationStatus(message.WalEnd);
            await conn.SendStatusUpdate(ct);
        }
    }

    private async Task<bool> ReplicationSlotExists(
        EventsSubscriptionOptions options,
        CancellationToken ct
    )
    {
        var (connectionString, slotName, _, _) = options;
        await using var dataSource = NpgsqlDataSource.Create(connectionString);
        return await dataSource.Exists(
          "pg_replication_slots", "slot_name = $1", new object[] { slotName }, ct
        );
    }

    private async Task CreatePublication(
        EventsSubscriptionOptions options,
        CancellationToken ct
    )
    {
        var (connectionString, _, publicationName, tableName) = options;
        await using var dataSource = NpgsqlDataSource.Create(connectionString);
        await dataSource.Execute(
          $"CREATE PUBLICATION {publicationName} FOR TABLE {tableName};", ct
        );
    }

    private async Task<bool> PublicationExists(
        EventsSubscriptionOptions options,
        CancellationToken ct
    )
    {
        var (connectionString, slotName, _, _) = options;
        await using var dataSource = NpgsqlDataSource.Create(connectionString);
        return await dataSource.Exists(
          "pg_publication", "pubname = $1", new object[] { slotName }, ct
        );
    }

    private async IAsyncEnumerable<object> ReadExistingEventsFromSnapshot(
        string snapshotName,
        EventsSubscriptionOptions options,
        [EnumeratorCancellation] CancellationToken ct
    )
    {
        await using var connection = new NpgsqlConnection(options.ConnectionString);
        await connection.OpenAsync(ct);

        await foreach (var @event in connection.GetEventsFromSnapshot(
          snapshotName, options.TableName, ct)
        )
        {
            yield return @event;
        }
    }

    internal abstract record CreateReplicationSlotResult
    {
        public record AlreadyExists: CreateReplicationSlotResult;

        public record Created(string TableName, string SnapshotName): CreateReplicationSlotResult;
    }
}

Это все еще наивная реализация, поскольку она не обладает полной отказоустойчивостью при чтении моментальных снимков данных.

Логическая репликация сама по обеспечит создание контрольных точек, нам не нужно об этом заботиться, а вот что касается моментальных снимков, то другая история. Но это история для другой специальной статьи!

Смотрите также более подробную техническую информацию о реализации в Pull Request .

Ps Большое спасибо Брару Пинингу за то, что он изначально реализовал эту часть в Npgsql и терпеливыми объяснениями указал мне правильное направление.

Tags:
Hubs:
Total votes 5: ↑5 and ↓0+5
Comments0

Articles