Как стать автором
Обновить
Точка
Как мы делаем онлайн-сервисы для бизнеса

Консистентность в конкуретной среде: как не захлебнуться в потоках данных

Уровень сложностиПростой
Время на прочтение21 мин
Количество просмотров6.9K

В прошлой статье я рассказал о конкурентной среде в Точке и типовых проблемах, с которыми сталкиваются разработчики. В этот раз речь пойдёт о том, как же можно эту консистентность реализовать и какие злые силы нам могут помешать это сделать. Решил больше не делить статьи на части, поэтому продолжение вот в этой большой статье, всё в одном месте.

Напомню, что текст рассчитан в первую очередь на разработчиков, которые имеют мало практического опыта работы в конкурентной и/или микросервисной среде, поэтому многие вещи я объясню на пальцах. Для тех, кто захочет ознакомиться с более строгими формулировками, сразу скажу, что здесь будут затронуты упрощённые идеи паттернов Transactional Outbox и Saga.

Консистентность в пределах одного сервиса

В прошлой статье я рассказал, как для Точки важна консистентность данных. Под консистентностью я здесь понимаю достоверность и непротиворечивость информации о планах и прогрессе выполнения задач. Иными словами, при консистентности сервисы «не забывают» выполнить поставленные задачи, а после выполнения не пытаются их сделать во второй раз.

Например, если пользователь оплачивает покупку — с него надо списать деньги. Если сервис забудет это сделать — продавец, скорее всего, не обрадуется такому исходу. А если плата спишется дважды, то недоволен будет сам пользователь.

Давайте разберёмся, как добиться консистентности в ситуациях, когда наш сервис практически не влияет на «внешний мир», то есть мы не посылаем и не получаем запросы от других сервисов. Взаимодействие происходит лишь с БД.

Казалось бы, а что тут сложного? Давайте просто писать в базу консистентные данные. Нормально делай — нормально будет! ? Но не так всё просто.

Постановка задачи 

Прежде чем начать писать код, давайте определимся, в каких условиях мы работаем и чего именно хотим.

В Точке мы реализуем масштабирование через увеличение количества одновременно запущенных экземпляров сервиса. Эти экземпляры будут работать с одними и теми же данными, иными словами, с разделяемыми ресурсами.

Дано:

  1. Несколько экземпляров сервиса работают одновременно.

  2. В любой момент:
    а) экземпляр может упасть по любой причине — например, OOM Killer, дрейн нод в k8s и т.д.;
    б) экземпляры могут начать обрабатывать одни и те же данные;
    в) для обработки чего-либо потребуется ручное вмешательство.

Задача: что бы ни случилось, система должна остаться в консистентном состоянии.

«Подумаешь – бином Ньютона!» (с). Всем ведь известно, как поступать в подобных ситуациях. А если кому-то и неизвестно, то решение же очевидно:

Поставить эксклюзивную блокировку
Прочитать данные из БД
Проанализировать прочитанные данные
Принять решение об изменении данных
Записать в БД изменённые данные
Отпустить блокировку

Поскольку на шаге 1 мы поставили блокировку, то на шаге 2 можем быть уверены, что данные будут актуальны и другие экземпляры сервиса их не поменяют до снятия блокировки. Соответственно, на шагах 3 и 4 можно не опасаться, что другой экземпляр изменит данные раньше, чем наш. И от перестановки экземпляров результат тоже не изменится.

Разумеется, это будет безупречно работать, когда остальные части нашего кода при работе с этими же данными ведут себя аналогично: ставят такую же блокировку и т. д., но об этом поговорим позднее.

Что может пойти не так?

Кто быстрее?
Кто быстрее?

Хороший вопрос.

У нас в Точке Postgres стоит за pgbouncer в режиме transactional pooling, поэтому нам доступны только transactional locks. То есть, если что-то отвалится после установки блокировки, то блокировка отпустится. *Имейте в виду, что ваши настройки могут отличаться.

Успели ли данные записаться в БД? Если да, то всё хорошо. Если нет, то данные останутся в том виде, в котором они были на момент начала работы с ними: как будто мы их не трогали. Итого: система останется в консистентном состоянии.

Но ведь на шаге 5 у нас может быть не какой-то один атомарный запрос, а несколько: что-то может пойти не так, когда одни запросы уже прошли, а другие ещё нет.

Например, мы выполняем перевод с одного счёта на другой и успели зафиксировать в базе списание денег, но не успели зафиксировать их поступление на счёт получателя.

Как с этим бороться мы тоже знаем: транзакции!

Тогда наш алгоритм приобретает вид:

 Поставить эксклюзивную блокировку
 Прочитать данные из БД
 Проанализировать прочитанные данные
 Принять решение об изменении данных
+Запустить транзакцию
 Записать в БД изменённые данные
+Закоммитить транзакцию
 Отпустить блокировку

Теперь добавим немного конкретики. В PostgreSQL блокировка может быть выполнена с помощью SELECT FOR UPDATE: запрос ставит блокировку, а после этого читает данные. Так же учтём, что он работает только внутри транзакции, а блокировка снимается автоматически после завершения транзакции: как удачной, так и неудачной.

Как алгоритм меняется теперь:

-Поставить эксклюзивную блокировку
-Прочитать данные из БД
+Запустить транзакцию
+Поставить эксклюзивную блокировку и прочитать данные (SELECT FOR UPDATE)
 Проанализировать прочитанные данные
 Принять решение об изменении данных
-Запустить транзакцию
 Записать в БД изменённые данные
 Закоммитить транзакцию
 Отпустить блокировку

Воплотив в python получим что-то такое:

transaction = orm.begin_transaction() 
try:
    obj = SomeModel.select_for_update('*').where(...)
    if should_change():
        ...  # change obj
        obj.save()
    transaction.commit()
except Exception:
    transaction.rollback()

Получается, что не нужно особых премудростей, чтобы обеспечить консистентность данных в пределах одного сервиса.

Но, как известно, программа всегда сделает ровно то, что ей сказал программист, а не то, что он от неё хотел.

Что ещё надо учесть

Пусть у нас есть таблица счетов пользователей. На неё ссылается другая таблица — с покупками пользователей. 

У нас задача написать функцию, которая не допустит дублей оплаты, если за покупку одновременно возьмутся несколько экземпляров сервиса. И вот мы добросовестно берём покупку, блокируем её, производим оплату и отпускаем блокировку…

Уже догадались, что может пойти не так?

Предположим, у пользователя на счёте есть 100 рублей. А в очереди на оплату — покупки на 50 и 70 рублей. Что если их обработка запустится одновременно? Ведь мы блокируем только покупки, а процесс оплаты нет. Получается, каждый из экземпляров увидит на счете 100 рублей и решит, что денег хватает для каждой покупки, но на самом деле это не так!

Можно, конечно, блокировать и объект покупки, и объект счёта, но это, в свою очередь, чревато дедлоками.

Например, у нас, кроме оплаты покупок, есть ещё алгоритм оплаты налогов, который тоже будет предполагать блокировку объекта счёта, с которого нужно выполнить списание. Также будем считать более приоритетной оплату налогов, нежели оплату штрафов.

Логично сначала заблокировать объект счёта, потом все объекты покупок с этого счёта, а далее списать оплату налога.

Если у пользователя появляется какой-то новый налог в момент совершения покупки, то может произойти такая ситуация: алгоритм оплаты покупки заблокирует объект покупки и попытается заблокировать счёт. А алгоритм оплаты налога (в это же самое время!) заблокирует счёт и попытается заблокировать покупки.

Что делать-то

Да, может казаться, что решение проблемы консистентности очевидно. Однако к этому вопросу не стоит подходить беспечно: в вашем проекте должны быть чёткие правила постановки блокировок, и все части кода должны им следовать. Иначе — будут печальные последствия ?

Консистентность при отправке запросов

Теперь, когда наш сервис умеет в собственную консистентность, будем пытаться общаться с «внешним миром».

Предположим, что другие сервисы, которым мы посылаем запросы, уже умеют их правильно обрабатывать.

Рассмотрим создание платежей. Я уже упоминал в вводной статье, что у нас есть автоматизированная банковская система — АБС, и это отдельный сервис. Другие сервисы общаются с ней через ESB. Если сервису нужно создать платёж, он просто обращается к АБС с запросом — для этого мы используем паттерн Request-Reply.

Я ранее оставил за кадром то, что с ESB мы общаемся посредством RabbitMQ. И хоть я и старался писать алгоритмы универсально, возможно где-то мне не удалось избежать этого контекста. Тем не менее, паттерн Request-Reply можно реализовать и без RabbitMQ, например, через HTTP, но со своими нюансами, которые выходят за рамки данной статьи.

Кажется, что здесь нет принципиальных отличий от предыдущей задачи. Суть в том, чтобы взять из базы данные и, в зависимости от их значений, выполнить действия:

Запустить транзакцию
Поставить эксклюзивную блокировку и прочитать данные
Проанализировать прочитанные данные
Принять решение об отправке запроса
Отправить запрос
Записать в БД, что запрос отправлен
Закоммитить транзакцию
Отпустить блокировку

Но…

Что-то может пойти не так

Главное не забыть доставить.
Главное не забыть доставить.

Догадались, какая оплошность была допущена в предыдущем коде?

В прошлый раз подобный алгоритм корректно сработал потому, что всё взаимодействие шло только с базой. В случае любой ошибки база бы откатилась к своему состоянию на момент начала транзакции. Но здесь же мы имеем дело не только с базой! Ещё мы отправляем запрос во «внешний мир».

Получается, если мы отправим запрос, а после этого что-то пойдёт не так, то наша БД откатится на то состояние, когда отправки запроса ещё даже в планах нет. А на самом деле запрос уже отправлен. В итоге — консистентность данных нарушена ?

Вернёмся к нашему примеру с оплатой покупок.

Пользователь совершает покупку. Сервис отправляет запрос на создание платежа, но из-за сбоя не фиксирует эту информацию. Спустя время сервис увидит в своей БД информацию о неоплаченной покупке и повторно решит сделать оплату.

После этого к нам придёт разгневанный покупатель с претензией о том, что с него списано два платежа. А оператор БД сможет ответить, что был только один платёж (мамой клянусь, только один!)

Как же в таком случае быть? Ведь, как уже упоминалось в постановке задачи, инстанс сервиса может упасть в любой момент. Но мы всё ещё можем полагаться на транзакции в БД.

Пусть мы не можем отправить запрос внутри транзакции, но можем сделать запись о намерении отправить такой запрос. В этом случае, если транзакция откатится, то мы вернёмся в консистентное состояние, которое было на старте транзакции. Иначе — мы запишем новое консистентное состояние данных. В нашем примере это подтверждение пользователя об оплате покупки и решение об отправке запроса на создание платежа.

N.B. здесь используется идея конечных автоматов. С помощью присваивания различных состояний сообщениям мы сможем легче определять, над какими сообщениями ещё надо произвести какие-то действия, а какие уже можно оставить в покое.

Однако запрос всё ещё надо отправить. Отправлять после коммита транзакции? Но ведь инстанс приложения может упасть сразу же после этого коммита.

На этот случай нам нужно с некоторой периодичностью запускать фоновую джобу, которая бы сканировала БД на предмет запросов, которые мы запланировали отправить, но ещё не отправили. После отправки запроса пометим его как отправленный — и дело в шляпе!

Здесь ещё стоит сказать о том, что хорошо бы разделять ситуации, когда отправка не состоялась: например, «сервису что-то помешало дойти до этого шага» и «при попытке отправки произошла ошибка».

Почему это важно? Рассмотрим случай с проблемой на стороне шины: если мы не будем различать эту причину, то фоновая джоба при каждом запуске будет видеть неотправленный запрос. Что она будет делать? Правильно: тупо долбить запросом в шину в надежде его протолкнуть.

Сценарий не так уж плох, но я бы рекомендовал менее «тупой» подход. Если в процессе отправки сообщения упала какая-то невиданная ошибка, то такое сообщение следует пометить особой меткой и выслать алерты разработчикам, например, в Sentry. Разработчик сможет вручную перезапустить отправку и добавить обработчик этой ошибки, чтобы сервис в будущем смог самостоятельно её решать.

Вы можете сказать, мол, как бы не так! Чем же это решает исходную проблему? Ведь фоновая джоба точно так же может упасть после отправки запроса, и она не успеет сделать запись об успешной отправке. Может случиться так, что джоба запустится сразу после коммита транзакции в основном процессе, где мы приняли решение об отправке. Основной процесс после коммита транзакции попытается отправить запрос, и фоновая джоба тоже! Как ни крути, а исходную проблему это не решает.

Здесь я хочу сделать небольшое отступление и рассказать о том, что наша шина — это своего рода брокер доставки сообщений. Брокеры сообщений поддерживают разные гарантии доставки:

  • at-most-once

  • at-least-once

  • exactly-once

Гарантии доставки отвечают на вопрос, сколько раз сообщение может быть доставлено получателю.

Как правило, exactly-once — это «несбыточная мечта» всех разработчиков, так как этот вид доставки требует неоправданно больших затрат ресурсов, особенно, если брокер работает в нескольких инстансах.

Гарантия at-most-once — это когда сообщение может быть доставлено 0 или 1 раз. Нам такое тоже не подходит, потому что мы не можем допустить, чтобы сообщение не пришло: это будет равносильно ситуации, когда пользователь совершил покупку, но сообщение о создании платежа так и не было доставлено в АБС.

В сухом остатке остаётся гарантия at-least-once — это когда сообщение может быть доставлено 1 и более раз. При этом виде гарантии каждый сервис-получатель должен быть готов к многократной доставке сообщения. Также у каждого сообщения должен быть свой уникальный идентификатор: так мы поймём, новое ли это сообщение или повторная доставка.

Соответственно, даже если мы в нашем сервисе сделаем так, что запрос отправляется только один раз, нет никаких гарантий, что шина его доставит так же.

Нам не нужно гарантировать единственность отправки запроса: если у отправленных запросов будет один и тот же идентификатор, то сервис-получатель при повторном получении увидит, что запрос с таким же идентификатором он уже обрабатывал и проигнорирует это.

Уверен, что найдутся те, кто спросит: «Ну раз можно не заботиться о том, сколько раз мы отправили запрос, то зачем вообще заморачиваться с разделением на принятие решения об отправке и самой отправкой? Ещё и с фоновой джобой…».

Возможно, в приведённом примере это и выглядит чересчур сложно. Но ситуации могут быть нетривиальными. И даже если предусмотрено всё, и совпадение идентификаторов запросов гарантировано, вам не кажется, что выделить запрос в отдельную сущность — это более естественно? То есть проще создать объект запроса и присвоить ему уникальный id, а не высчитывать его по косвенным признакам, не говоря уже о хранении информации об отправке. А нам ведь ещё надо получить ответ на этот запрос и обработать его!

Итак, давайте подытожим действия:

 Поставить эксклюзивную блокировку и прочитать данные
 Проанализировать прочитанные данные
 Принять решение об отправке запроса
+Создать в БД соответствующий запросу объект, с пометкой «готов к отправке»
+Закоммитить транзакцию и отпустить блокировку
 Отправить запрос
-Записать в БД, что запрос отправлен
-Закоммитить транзакцию
-Отпустить блокировку
+В запросе сделать отметку о результате отправки

Воплощённое в питоне будет смотреться как-то так:

transaction = orm.begin_transaction()
try:
   data = ...  # some select for update
   if should_send_request(data):
       request = Request.create(state='should_be_sent')

       transaction.on_commit(
           func=send_request_to_esb,
           args=[request],
       )
   transaction.commit()
except Exception:
   transaction.rollback()

Где исполнение решения об отправке выглядит так:

def send_request_to_esb(request: Request):
   send_result = esb.send_message(request)
   transaction = orm.begin_transaction()
   try:
       request = Request.select_for_update('*').where(id=request.id)

       if request.state == 'should_be_send':
           request.state = send_result.get_state()
       elif send_result.get_state() == 'sent':
           request.state = 'sent'

       request.save()
       transaction.commit()
   except Exception:
       transaction.rollback()

Кроме того, опишем фоновую джобу с алгоритмом:

Найти в БД все запросы,
    решение об отправке которых было принято,
    но сами запросы не помечены как отправленные
Отправить каждый такой запрос

По традиции воплотим в питоне ?

def background_request_sender():
   for request in Request.select('*').where(state='should_be_sent'):
       send_request_to_esb(request)

При использовании такого алгоритма, сервис уж точно не забудет отправить запрос, а зафиксировав отправку, не будет избыточно спамить этим запросом.

Консистентность при получении запросов или ответа на запрос

Итак, мы разобрались с тем, как правильно рассылать запросы. Теперь поговорим об их обработке получателем, а также об обработке ответов.

Выше я писал, что при обмене сообщениями мы используем гарантию доставки at-least-once. Соответственно, сообщение отправителя может быть доставлено и два, и более раз. Обработчик должен быть к этому готов: мы же помним пример с покупателем, у которого деньги за покупку должны списаться ровно один раз.

Но не стоит переживать: сервис-отправитель уже максимально об этом позаботился — каждое сообщение имеет свой собственный уникальный идентификатор. Соответственно, если сообщение будет доставлено повторно, то оно будет иметь тот же идентификатор. Сервису-получателю достаточно лишь запоминать идентификаторы уже обработанных сообщений.

Ну уж здесь алгоритм точно должен получиться простым

Получили очередной запрос
Если ранее обрабатывали запрос с таким идентификатором — конец алгоритма
Выполнили обработку
Отправили ответ
Запомним идентификатор запроса

И тем не менее...

Что-то может пойти не так!

Ну, кроме меня такое точно никто не делал.
Ну, кроме меня такое точно никто не делал.

Да как так-то? Опять не так?

Вдохновившись тем, что отправитель действительно во многом о нас позаботился, мы забыли о том, что наш сервис тоже работает в нескольких инстансах и каждый из инстансов может упасть в любой момент. То есть мы можем выполнить обработку запроса и скрашиться, не успев запомнить идентификатор обработанного запроса, а, следовательно, при повторной доставке запроса снова выполним обработку.

На вопрос как же быть отвечу, перефразируя цитату одного вождя: «Транзакции, транзакции и ещё раз транзакции!»

Заодно вспомним о том, что отправка ответа должна быть вынесена за пределы транзакции, ибо, как и в случае с отправкой запроса, она не откатится вместе с транзакцией:

 Получили очередной запрос
+Запустили транзакцию
 Если ранее обрабатывали запрос с таким идентификатором — конец алгоритма
 Выполнили обработку
-Отправили ответ
-Запомним идентификатор запроса
+Записали в базу решение об отправке ответа
+Закоммитили транзакцию
+Отправили ответ

Чувствуете ощущение дежавю? Мы опять получаем ситуацию, в которой можем скрашиться сразу после коммита транзакции и не отправить ответ. А значит нам опять нужна фоновая джоба, которая будет время от времени искать в базе такие запросы, которые мы обработали, но не отправили ответ.

И как бы мне ни хотелось сказать, что «вот и всё», скажу «но и это ещё не всё».

Повторная доставка запроса может произойти когда угодно. Причём почти одновременно с первой. В этом случае высока вероятность, что два этих сообщения балансировщик отправит в разные инстансы.

Что же мы получим? Каждый инстанс запустит транзакцию, посмотрит в БД, увидит, что такой запрос ещё не приходил (ведь ни один из них ещё не пометил свой запрос как обработанный) и... Начнут обработку!

Казалось бы, мы уже умеем защищаться от подобного поведения: нужно всего лишь поставить блокировку, чтобы запретить одновременный доступ к записи. Но в том и проблема, что соответствующей записи у нас в базе ещё нет! Нам нечего блокировать! Что же делать? Для меня в своё время это стало настоящим камнем преткновения.

Однако данная ситуация имеет несколько решений. Самое простое, которое приходит на ум — раз записи нет, так давайте создадим её!

То есть прежде чем начать транзакцию с обработкой записи, выполним транзакцию с созданием записи. Давайте посмотрим, что у нас будет получаться в разных вариантах.

Два процесса начали обрабатывать одно и то же входящее сообщение. Сначала запускается создание (и только создание!) записи. Хоть мы и можем отправить два запроса на создание одновременно, СУБД их выполнит в какой-то очерёдности. Тот, который выполнится первым, успешно создаст запись, а второй (и каждый последующий) упадёт с ошибкой нарушения уникальности идентификатора сообщения. Получается, у нас в БД будет создана единственная запись, соответствующая поступившему запросу.

Ну а далее вполне можем пойти по накатанной: заблокировать уже существующую запись и начать её обработку.

N.B. Инстанс приложения может в любой момент отвалиться: то есть он может создать запись, но никак её не обработать. Поэтому важно при создании записи пометить её как подлежащую обработке... Ну это вы уже знаете ?

Теперь оформим в качестве алгоритма то, к чему мы пришли:

Получили очередной запрос
Создали в БД объект, соответствующий этому запросу
Если упала ошибка ограничения ункальности -- конец алгоритма
Иначе -- запустить обработку запроса

Вжух, и то же самое на python:

def receive_request(raw_request):
   try:
       request = Request.create(data=raw_request, state='should_be_handled')
   except orm.UniqueConstraintViolation:
       return

   handle_request(request)

Алгоритм обработки запроса:

Поставить эксклюзивную блокировку и извлечь запись соответствующую запросу
Если она не помечена как подлежащая обработке — конец алгоритма
Обрабатываем запрос
Создаём в БД запись, соответствующую ответу на запрос с пометкой «готов к отправке»
Коммитим транзакцию и отпускаем блокировку
Отправляем ответ

*звуки усердного кодинга*

def handle_request(request: Request):
   transaction = orm.begin_transaction()
   try:
       request = Request.select_for_update('*').where(id=request.id)
       if request.state != 'should_be_handled':
           transaction.commit()
           return
       try:
           response = _handle_request(request)
           request.state = 'handled'
           transaction.on_commit(
               func=send_response_to_esb,
               args=[response],
           )
       except Exception:
           request.state = 'handle_error'
       request.save()
       transaction.commit()
   except Exception:
       transaction.rollback()

И наша фоновая джоба:

Найти в БД все запросы, подлежащие обработке
Для каждого запроса выполним процедуру обработки

Для python-говорящих ?:

def background_handler():
   for request_id in Request.select('id').where(state='should_be_handled'):
       handle_request(request_id)

Но подождите, подождите! Откуда уверенность, что алгоритм рабочий? Выше написано, что нам важно не пропустить ни одного запроса. А ещё выше — что OOM Killer может убить процесс в любой момент. Что если это произойдёт, когда мы ещё не успели выполнить создание объекта запроса? Здесь точно нет просчёта?

Как правило, брокеры сообщений ожидают от обработчика ответа об успешной или неуспешной обработке (например, ack и nack в RabbitMQ). Все сообщения, для которых обработчик не вернул ack-ответ (а в случае с OOM Killer-ом брокер его и не получит), будут помещены в специальную очередь. Можно настроить ретрай передачи таких сообщений обработчику и даже алерт разработчикам, если после какого-то количества ретраев сообщение всё ещё не обработано.

За кадром остался код отправки и обработки ответов на запросы внутри сервиса, из которого пришёл исходный запрос, но с точностью до обозначений он будет эквивалентен коду отправки и обработки запросов.

Консистентность при обработке событий

Итак, теперь мы знаем о том, как должны вести себя сервисы при общении вида «Запрос-Ответ». Но есть ещё один способ, который мы не можем обойти вниманием. Речь идёт о событиях.

События — это формат одностороннего информирования о чём-либо. То есть сервис-отправитель выполняет отправку, и его больше никак не заботит судьба этого сообщения: кому оно было доставлено и было ли обработано. Через события мастер-система может информировать об изменении состояния каких-либо объектов, то есть на протяжении «жизни» одного объекта формируется цепочка относящихся к нему событий.

Как правило, этот паттерн общения подразумевает, что получателей может быть несколько. Список получателей каждого события фиксируется в сервисе ESB.

Продолжим рассматривать наш пример с платежами.

Платежи контролируются АБС, и любой сервис, который хочет создать платёж, должен к ней обращаться. Но создать платёж — мало.

Платёж должен встать в очередь на обработку прежде, чем произойдет оплата. К моменту оплаты на счёте может быть недостаточно средств. Уже созданный платёж может быть отменён по каким-либо причинам. Оплата может не пройти по техническим причинам. И в конце концов платёж может стать оплаченным.

О том, что происходит с платежом, интересно знать не только сервису-инициатору. Например, у нас может быть сервис, начисляющий пользователю кешбэк за покупки. Этому сервису нужно иметь представление о платежах пользователей. Или могут быть сервисы, отслеживающие все операции со специфичным счётом.

На первый взгляд может показаться, что здесь ситуация в разы проще, чем с запросами: отправлять сообщения мы уже умеем, защищаться от дублей тоже. А здесь нас ещё и не заботит ответ от получателя.

И действительно, на стороне отправителя нет никаких проблем, которые бы мы ещё не умели решать. Однако они есть на стороне получателя.

Дело в том, что ESB может нам не только доставить одно и то же сообщение несколько раз, она ещё и не гарантирует порядок доставки сообщений.

Доставка сообщений в том же порядке, в котором они были отправлены отправителем — ещё одна «несбыточная мечта» разработчиков, как и гарантия доставки exactly-once.

Почему же сообщения могут быть доставлены не в том порядке?

Давайте вспомним, что могло случиться при отправке запроса:

- Отправка сообщения зафейлилась, и мы не зафиксировали в базе факт отправки. 

- Сообщение отправлено, но не зафиксирован факт отправки

- Сообщение отправлено и факт отправки зафиксирован

Последняя ситуация слишком «скучная», так как здесь слишком мало вещей, которые пошли не по плану ?. Поэтому рассмотрим первые две. 

В обоих случаях, поскольку в БД не зафиксирован факт отправки, периодическая фоновая джоба предпримет ещё одну попытку отправки.

А теперь посмотрим на всё это с точки зрения отправки событий. Пусть у нас есть «Событие №1» и «Событие №2». Алгоритм отправки у нас остаётся тот же. Получается, у нас возможна ситуация, когда отправка События №1 зафейлилась, а отправка События №2 успешно состоялась. И только потом в рамках фоновой джобы произошла отправка События №1. Так что даже при наличии «идеального» брокера сообщений нет гарантии, что сообщения будут доставлены с установленной очередностью.

Казалось бы, как в таком случае жить? Ведь без порядка будет полный беспорядок! Выходит, что мы сначала получим событие об оплате и только потом событие о постановке в очередь на оплату. Это как если бы мы получали события о том, что некий Вася надел штаны, а после — сообщение, что он надел трусы (хотя на самом деле Вася вовсе не пытался косплеить Супермена, просто в таком порядке были доставлены сообщения).

Мне так сказали, вроде норм.
Мне так сказали, вроде норм.

Выход из ситуации напрашивается сам собой: нужно указывать очередность событий. Делать это можно по-разному: например, указывать порядковый номер.

Также мы стараемся не посылать события об изменении какого-либо объекта, а на каждое изменение объекта посылать событие с полным текущим его состоянием. Улавливаете разницу? При таком подходе даже если получатель сначала получит более позднее событие, то он сразу получит представление и о том, что произошло раньше.

Разумеется, чтобы не нагружать сеть чрезмерным количеством сообщений, события, описывающие объект целиком, отправляются не для всех объектов: есть важные объекты, такие как платежи, об изменениях которых нужно знать многим сервисам, а есть менее важные объекты, об изменении которых события посылаются исходя из требований бизнес-логики.

То есть по аналогии с Васей мы бы не отправляли событие о том, что именно он надел только что. Мы бы отправляли событие о том, что на нём уже надето. И события выглядели бы так:

Событие №1

  • трусы

Событие №2

  • трусы

  • рубашка

Событие №3

  • трусы

  • рубашка

  • штаны

Соответственно, если обработчик сначала получит Событие №3, а потом Событие №1, то у него не возникнет неправильного представления о том, как одет Вася.

Так и с платежами: если платёж уже оплачен, то понятно, что до этого он был создан, потом встал в очередь на оплату и после — оплачен.

Даже если по каким-то причинам в вашей задаче нужно обрабатывать каждое событие из последовательности, благодаря присвоенным порядковым номерам вы сможете, получив более позднее событие, сохранить его, дождаться получения более раннего события, и только обработав событие с номером n-1 приступать к обработке события с номером n.

В Точке мы стараемся организовывать события и их обработку так, что получив очередное событие, можем смело игнорировать все события, связанные с этим объектом, которые логически предшествуют, но по факту пришли позже.

Исходя из сказанного выше, обработку событий можно реализовать примерно таким образом:

Получили очередное событие
Проверим, является ли это событие более актуальным, чем последнее полученное
Если нет — конец обработки
Выполнить обработку события
Сохранить метку актуальности события

Но опять выходит как-то слишком уж просто: как-то мало в нашем алгоритме транзакций и блокировок ?

Разумеется, это ещё не сам алгоритм, а только его набросок.

Событие — это слепок состояния какого-либо объекта (например, объекта платежа). И если мы получаем информацию о каком-то объекте впервые, то у нас нет никакого «последнего полученного» события, чтобы сравнить актуальность. Далее, как и с запросами, одно и то же событие может прийти одновременно и начать обрабатываться разными инстансами нашего приложения (а ведь мы хотим, чтобы обработка произошла строго единожды!). Однако тут ситуация осложняется ещё и тем, что одновременно могут прийти два разных события, которые связаны с одним объектом. И нам важен порядок обработки этих событий.

Возможно, уследить за всем этим трудно, но у нас есть всё необходимое для решения данной задачи!

Как и с запросами, мы можем организовать транзакцию на создание объекта-события в БД. Однако здесь есть важный момент: надо поставить ограничение уникальности не на id сообщения, а на id объекта, об изменении которого нам сообщает событие.

Если вам всё же нужно обрабатывать каждое событие (т. е. вы не можете, обработав более актуальное событие, просто отбрасывать все «опоздавшие»), то последние утверждения для вас не подходят. Однако у вас и в целом алгоритм будет отличаться.

Таким образом, если создание объекта прошло без проблем, значит о таком объекте мы получили информацию впервые. Иначе у нас упадёт ошибка нарушения ограничения уникальности. В этом случае надо извлечь уже имеющийся объект и проверить, получили ли мы более актуальное событие. И, разумеется, когда мы работаем с уже сохранённым в БД объектом, мы работаем в транзакции и с установкой блокировки.

Но хватит слов, давайте кодить!

Получение события не будет особенно отличаться от получения запроса:

Создадим соответствующий событию объект в БД с пометкой «подлежит обработке»
Если упала ошибка нарушения ограничения уникальности id события — конец алгоритма
Иначе — запустим обработку полученного события.
def receive_event(raw_event):
   try:
       event = Event.create(data=raw_event, state='should_be_handled')
   except orm.UniqueConstraintViolation:
       return

   handle_event(event)

Обработка события:

Создадим пустой объект, соответствующий remote-объекту, о котором нас информирует событие
Если упала ошибка ограничения уникальности id remote-объекта, проигнорируем её (нам важно чтобы в БД был такой объект, чтобы мы могли его заблокировать)
Начнём транзакцию
Поставим эксклюзивную блокировку и извлечём объект, с которым связано событие
Если наше событие менее свежее, чем последнее обработанное — конец алгоритма
Выполним действия, связанные с обработкой события
Запомним номер последнего обработанного события для данного объекта
Пометим событие как обработанное
Закоммитим транзакцию и отпустим блокировку

N.B. Обратите внимание, что на шаге проверки на свежесть события можно прописать своё условие. Например, если вы не можете «перешагивать» через события и вам нужно обрабатывать их строго последовательно, вы можете проверить, что номер текущего события ровно на единицу превышает номер последнего обработанного события. И если это не так – заканчивать обработку, дожидаясь именно следующего события.

Запитонируем:

def handle_event(event):
   try:
       # если пришло событие об объекте, которого ещё нет в базе -- создадим "заглушку" для него
       RemoteObjModel.create(id=event.remote_obj_id, last_handled_event_number=-1, data=None)
   except orm.UniqueConstraintViolation:
       # ничего страшного, если объект уже существует
       pass
   # в итоге далее можем быть на 100% уверены, что объект
   # с id=event.remote_obj_id точно есть в БД

   transaction = orm.begin_transaction()
   try:
       remote_obj = RemoteObjModel.select_for_update('*').where(id=event.remote_obj_id)

       if remote_obj.last_handled_event_number >= event.serial_number:
           event.state = ‘handled’
           event.save()
           transaction.commit()
           return

       try:
           _handle_event(event, remote_obj)

           # Добавлено для наглядности. формально можно "унести" в `_handle_event`
           remote_obj.last_handled_event_number = event.serial_number
           remote_obj.save()

           event.state = 'handled'
       except Exception:
           event.state = 'handle_error'

       event.save()
       transaction.commit()
   except Exception:
       transaction.rollback()

И не забудем про «лекарство от склероза» для нашего сервиса ?

Найдём в БД все события, помеченные как «подлежащие обработке»
Упорядочим эти события по id описываемого remote-объекта и по номеру
Выполним обработку каждого события

Иными словами

def background_events_handler():
   query = Event.left_join(
       RemoteObjModel,
       on=(Event.remote_obj_id == RemoteObjModel.id),
   ).select_distinct(
       Event.remote_obj_id,
   ).where(
       RemoteObjModel.id is None | Event.serial_number > RemoteObjModel.last_handled_event_number
   ).order_by(
       Event.remote_obj_id,
       orm.Desc(Event.serial_number),
   )

   for event, remote_obj in query:
       handle_event(event)

N.B. В нашей реализации мы упорядочиваем номера событий от старшего к младшему: если уже есть более новое событие — сразу его обрабатываем. Далее пробегаемся по остальным событиям с менее актуальными номерами и помечаем их как не требующие обработки. Можно и наоборот: упорядочить события от менее к более актуальным и обработать все события последовательно.

Заключение

Ух, многабукаф подходят к концу. Что хочется сказать напоследок…

Иногда мы забываем о том, что код будет работать параллельно с другим кодом. А если не забываем, то можем упустить из виду потенциальные проблемы. А даже если и не упускаем, можем решить, что такие ситуации слишком маловероятны, чтобы с ними заморачиваться.

Я постарался разобрать общие проблемы, которые могут нарушить консистентность данных в многопоточной асинхронной среде. Конечно, в статье нет всех возможных ситуаций, так как у каждой задачи своя специфика.

Надеюсь, что теперь у вас есть достаточный базис: если не для того, чтобы сразу выявлять потенциальные нарушения консистентности, то для того, чтобы понять, какие вопросы стоит себе задать. Ведь не всё, что кажется простым и понятным, действительно таким является.

Как именно реализовывать обработку данных — вы решите сами. Кто-то готов мириться с периодическими ошибками и не видит большой проблемы в том, чтобы вручную поправить данные.

А мое мнение такое: недопустимо закладывать ошибку в код (особенно осознанно). После деплоя запустится конвеер этих ошибок, и не факт, что мы успеем их вовремя обнаружить, чтобы исправить вручную. И вообще — ошибок может стать слишком много, и для исправления придётся писать отдельный скрипт, в котором тоже можно будет ошибиться. Я считаю, что лучше делать новые задачи вместо того, чтобы отвлекаться на переотправку недоставленных сообщений и исправление данных.

Теги:
Хабы:
Всего голосов 19: ↑19 и ↓0+19
Комментарии10

Публикации

Информация

Сайт
tochka.com
Дата регистрации
Дата основания
Численность
1 001–5 000 человек
Местоположение
Россия
Представитель
Сулейманова Евгения