Всем известно, что в распределённых системах невозможно гарантировать доставку сообщений, выполняемую ровно один раз. А вот обеспечить однократную обработку сообщений можно. Добавляя к каждому из сообщений уникальный ключ идемпотентности, можно сделать так, чтобы потребители сообщений распознавали бы и игнорировали дубликаты сообщений. То есть — игнорировали бы сообщения, которые уже были получены и успешно обработаны.

Как именно работают подобные механизмы? Потребитель, получая сообщение, берёт его ключ идемпотентности и сравнивает с ключами сообщений, которые уже были обработаны. Если такой ключ уже встречался — входящее сообщение является дубликатом и его можно проигнорировать. В противном случае потребитель начинает обработку сообщения. Например — сохраняя в базе данных само это сообщение или какое-либо представление данных, полученное после его анализа.
Кроме того, потребитель сохраняет ключ идемпотентности сообщения. Очень важно, чтобы две этих операции были бы выполнены атомарно. Обычно это достигается путём их выполнения в транзакции базы данных. В результате получается, что возможны два исхода такой операции. Первый — сообщение обработано и его ключ идемпотентности сохранён в базе данных. Второй — выполнен откат транзакции, в базу данных не внесено никаких изменений. При таком подходе обеспечивается то, что потребитель, который не смог обработать сообщение, обработает его снова, после его повторной доставки. И это же гарантирует то, что система проигнорирует дубликаты сообщения, полученные после его успешной обработки.
Универсальные уникальные идентификаторы
Поговорим о том, что собой представляет хороший ключ идемпотентности. Один из возможных подходов к их формированию заключается в использовании UUIDv4. Подобные идентификаторы, формируемые случайным образом, решают задачу назначения уникальных меток каждому из сообщений. Но их использование требует того, чтобы потребитель хранил бы UUID (Universally Unique IDentifier, универсальный уникальный идентификатор) абсолютно всех ранее полученных сообщений. Это нужно для того, чтобы потребитель мог бы надёжно выявлять дубликаты сообщений. Такой подход может, в зависимости от объёма сообщений, оказаться непрактичным. С прагматической точки зрения приемлемой может оказаться такая схема работы, когда сохранённые UUID удаляют по истечении некоего срока. Но при этом система должна нормально переносить ситуации, когда, после того, как время хранения ключей истекло, она иногда может получать и обрабатывать дубликаты сообщений. К сожалению, если удалять UUID, ни система, отправляющая сообщения, ни система, их обрабатывающая, не будут обладать какими-либо сведениями о дубликатах.
Такое положение дел можно, до некоторой степени, улучшить, добавляя к ключу идемпотентности временную метку. Например — используя ключ формата UUIDv7, который содержит и временную метку (первые 48 битов) и случайный идентификатор (оставшиеся биты). Тут ещё можно взглянуть и на ULID. При таком подходе потребитель сообщений может узнать о том, что получил сообщение со «слишком старым» ключом идемпотентности. И хотя выяснить то, дубликат это или нет, он не может, он способен сообщить отправителю сообщения о том, что обработать это сообщение он не в состоянии. Дальнейший ход действий зависит от отправителя. Например, если сообщение — это часть последовательности событий, происходящих при обработке некоего платежа, система может порекомендовать пользователю проверить свой банковский счёт и выяснить, был ли уже выполнен этот платёж. И только в том случае, если это не так, будет отправлено новое сообщение с тем же содержимым, но со свежим UUID.
Монотонно возрастающие последовательности
Всех этих сложностей можно избежать в тех случаях, когда допустимо использовать в роли ключей идемпотентности значения из монотонно возрастающих последовательностей. В подобных обстоятельствах потребителю сообщений больше не нужно хранить все ключи обработанных сообщений (или подмножество таких ключей, размер которого продиктован нуждами системы). Ему нужно хранить лишь единственное значение — ключ последнего обработанного сообщения. Если он получает сообщение с тем же или меньшим ключом идемпотентности, это сообщение, вероятно, является дубликатом и его можно будет проигнорировать. Возможна ситуация, когда потребитель взаимодействует с источниками сообщений, данные в которых разделены на несколько блоков, вроде топика Kafka с несколькими партициями. Бывает так, что потребитель получает сообщения от нескольких независимых отправителей (например — от разных клиентов REST API, каждый из которых использует собственную, независимую от других последовательность для создания ключей). В подобных случаях потребитель сообщений должен хранить последнее значение ключа для каждого логического раздела одного отправителя или для каждого из независимых отправителей.
Монотонно возрастающие ключи идемпотентности — это огромный шаг вперёд с точки зрения потребителя сообщений. Но, с другой стороны, они сильно усложняют жизнь системам, отправляющим сообщения. Создание значений из монотонно возрастающей последовательности — это не такая уж и простая задача. Если отправитель сообщений — это однопоточная система, выдающая по одному сообщению за раз, то такая задача решается элементарно. В подобном случае в качестве источника значений для ключей идемпотентности может быть использована последовательность, сгенерированная средствами базы данных. Тут подойдёт даже простой ��чётчик, данные которого хранятся в памяти. Пропуски в последовательности — это нормально, так как можно инкрементировать счётчик, хранящийся где-то на диске, в базе данных, прибавляя к нему большие значения, а реально используемые значения выдавать из копии этого счётчика, хранящегося в памяти. При таком подходе можно сократить количество операций дискового ввода/вывода. В эту категорию, с точки зрения потребителя, попадают смещения партиций Kafka, так как их можно рассматривать в роли монотонно возрастающих ключей идемпотентности для сообщений, полученных из конкретной партиции.
Обстоятельства осложняются в том случае, когда в систему, отправляющую сообщения, одновременно приходит несколько запросов, обрабатываемых в конкурентном режиме. Например — это может быть REST-служба с несколькими рабочими процессами, обрабатывающими запросы, возможно — даже служба, развёрнутая на нескольких вычислительных узлах кластера. Для того чтобы обеспечить монотонное возрастание значений, операции получения ключей идемпотентности и выдачи сообщений с этими ключами в каждом рабочем процессе должны проводиться атомарно, их не должны прерывать другие рабочие процессы. В противном случае можно оказаться в ситуации, когда поток A извлекает значение последовательности 100, а поток B — значение 101. Далее — поток B отправляет сообщение с ключём 101, а после этого поток A отправляет сообщение 100. При таком порядке отправки и получения сообщений потребитель, поступив некорректно, отбросит сообщение A, приняв его за дубликат уже обработанного сообщения.
В большинстве случаев обеспечение подобного уровня атомарности операций будет означать образование в системе ужасно «узкого» «бутылочного горлышка». По сути — все запросы к системе, выдающей ключи, будут выполняться в последовательном режиме, это не будет зависеть от того, сколько развёрнуто рабочих потоков или экземпляров службы. Обратите внимание на то, что если вы решаете следовать именно этим путём — вы не сможете положиться только на средства для генерирования последовательностей, встроенные в системы управления базами данных. Вместо этого, чтобы гарантировать монотонное возрастание ключей идемпотентности в исходящих сообщениях, нужно будет использовать особый механизм, вроде рекомендательных блокировок Postgres.
Получение ключей идемпотентности из журнала транзакций
А можно ли как-нибудь совместить оба эти подхода? Можно ли обеспечить эффективное использование памяти потребителя сообщений за счёт применения монотонно возрастающих ключей, но при этом не вредить производительности многопоточной системы, отправляющей сообщения? Как оказалось — это возможно, по меньшей мере — в тех случаях, когда отправка сообщений может быть организована в их источнике асинхронно — то есть — независимо от обработки входящих запросов. Смысл тут в том, что клиенты системы, отправляющей сообщения, получают подтверждение того, что было зафиксировано намерение отправить сообщение или запрос, но при этом не получают соответствующие данные немедленно. Если сценарий использования системы позволяет применение подо��ной схемы работы, задачу можно свести к однопоточной ситуации, описанной выше. Вместо передачи сообщений напрямую целевой системе каждый поток-отправитель сообщений ставит их в очередь. Эта очередь обрабатывается однопоточным рабочим процессом, который выдаёт все сообщения последовательно. В работе The Synchrony Budget говорится о том, что перевод систем на выполнение неких действий в асинхронном режиме обычно приносит пользу в том случае, если не нужно, чтобы они выдавали бы ответы на запросы немедленно.
Вот — конкретный способ реализации такого подхода. Это — вариант широко используемого паттерна outbox, использующего журнал транзакций базы данных службы, отправляющей сообщения. В конце концов, нет необходимости в том, чтобы самостоятельно упорядочивать входящие запросы, так как база данных уже это делает, сериализуя транзакции в своём журнале. Когда отправитель фиксирует в журнале транзакций намерение отправить сообщение — например — делая запись в определённую таблицу — процесс, наблюдающий за журналом, может назначить этому сообщению ключ идемпотентности, основываясь на его позиции в журнале.
Реализация такой схемы работы достаточно проста и понятна при использовании CDC-инструментов (Change Data Capture, захват изменения данных), вроде Debezium. А именно: из журнала получают сообщения, которые нужно отправить, перехватывая события INSERT из outbox-таблицы. Этим сообщениям, перед их отправкой, назначают ключи идемпотентности, полученные на основе смещений записей в журнале. То, как именно такое может быть реализовано на практике, зависит от конкретной используемой СУБД.
Например, в Postgres гарантируется то, что значения LSN (log sequence number, порядковый номер журнала) событий фиксации транзакций в WAL (write-ahead log, журнал предзаписи) монотонно возрастают. Событие фиксации одной транзакции, произошедшее после такого же события другой транзакции, будет иметь больший LSN. Более того — гарантировано то, что в пределах некоей транзакции значения LSN событий тоже монотонно возрастают. Благодаря этому кортеж вида { Commit LSN, Event LSN } отлично подходит на роль ключа идемпотентности. Для того чтобы не раскрывать тот факт, что отправитель сообщения использует базу данных Postgres, оба значения можно закодировать в виде единого 128-битного числа. Обратите внимание на то, что для реализации этого решения нет нужды в развёртывании Kafka или Kafka Connect. Для подобного проекта отлично подходит встроенный движок Debesium. Он позволяет назначать ключи идемпотентности прямо из метода-коллбэка службы, выдающей сообщения, не требуя дополнительной инфраструктуры. Если для реализации этого паттерна используется Postgres, нам даже не понадобится выделенная outbox-таблица, так как система позволяет писать произвольные данные в журнал транзакций посредством функции pg_logical_emit_message(), что отлично нам подходит.
Итоги
Итак, подумаем о том, в каких ситуациях уместно использовать различные подходы к формированию и обработке ключей идемпотентности. Как всегда, тут нет однозначного, универсального ответа. Ответ зависит от нужд и особенностей конкретного проекта. Во многих сценариях, вероятно, достаточно будет воспользоваться UUID и периодически удалять записи об уже обработанных ключах. При этом нужно, чтобы проект допускал бы то, что некоторые сообщения иногда могут быть обработаны повторно — при поступления дубликата сообщения по прошествии периода хранения сведений о ключах обработанных сообщений.
Чем больше сообщений нужно обрабатывать — тем более привлекательным выглядит решение, основанное на монотонно возрастающих последовательностях значений ключей. Его применение позволяет организовать выявление дубликатов сообщений и при этом экономно пользоваться памятью вне зависимости от того, сколько именно сообщений приходится обрабатывать. Предложенный выше подход, основанный на журналах, может оказаться эффективным решением для реализации подобной системы, но его применение сопряжено с повышением сложности эксплуатации и сопровождения системы. А именно, применяемая СУБД должна поддерживать логическую репликацию, нужно обеспечивать работу CDC-коннектора и так далее. Но при этом многие компании уже применяют CDC-пайплайны для других целей (это — аналитика, поисковая индексация, инвалидация кеша и прочее подобное). Если ваша компания относится к этой категории — применение этого подхода не слишком усложнит вашу систему. Если же вы этим не пользуетесь — сопоставьте рост сложности поддержки системы и выгоды, которые даст вам новый подход к работе с ключами идемпотентности (выявление дубликатов сообщений с постоянным уровнем использования памяти).
О, а приходите к нам работать? 🤗 💰
Мы в wunderfund.io занимаемся высокочастотной алготорговлей с 2014 года. Высокочастотная торговля — это непрерывное соревнование лучших программистов и математиков всего мира. Присоединившись к нам, вы станете частью этой увлекательной схватки.
Мы предлагаем интересные и сложные задачи по анализу данных и low latency разработке для увлеченных исследователей и программистов. Гибкий график и никакой бюрократии, решения быстро принимаются и воплощаются в жизнь.
Сейчас мы ищем плюсовиков, питонистов, дата-инженеров и мл-рисерчеров.
