Используем очереди совместно с БД: обсуждение проблем, возможные способы решения

    Очереди - прекрасный инструмент, который практически идеально масштабируется. Не справляется железо? Просто добавили узлов в кластер. Когда очередь присутствует в проекте, то возникает соблазн всё больше функционала реализовывать с её помощью.

    О подводных камнях такого пути поговорим в этой статье.

    Рано или поздно, применяя очереди, пользователь сталкивается с вопросом использования их совместно с каким-то сервисом, базой данных и т.п.

    • Заказ выполнен, нужно отправить СМС-уведомление пользователю.

    • Поступил новый заказ, нужно отправить Push-уведомление исполнителям.

    • Работа выполнена, нужно списать деньги со счёта клиента.

    Во всех перечисленных примерах изменения в какой-то бизнес-сущности фиксируются в БД (или сервисе с БД), и возникает большой соблазн соблазн отправлять уведомления с использованием очередей.

    Что мы имеем в данной ситуации? Первоначальная, простейшая структура кода:

    • Сервис (наша программа) фиксирует изменение данных в БД.

    • Затем сервис кладёт задание в очередь.

    Фактически в данном случае требуется реализовать событийный триггер на изменение записи данных.

    И в общем случае, получается, здесь мы имеем две записи в две различные БД: сервиса и очереди.

    Теперь перенесёмся в реальный мир и рассмотрим, какие ситуации могут возникнуть:

    • Всё в порядке. БД доступна, БД очереди доступна;

    • БД недоступна, БД очереди доступна;

    • БД доступна, БД очереди недоступна;

    • БД недоступна, БД очереди недоступна.

    Наиболее опасен для бизнеспроцессов тут третий случай: после того, как данные были зафиксированы в БД, уведомить заинтересованные стороны не получилось.

    И вот, если начинать задумываться над этой проблемой, то реализация становится из очень простой... очень сложной. В пределе приходится реализовывать двухфазный коммит.

    Но со сложными решениями всё понятно, давайте подумаем о более простых.

    Неправильные решения

    Очень многие, впервые столкнувшись с этой проблемой (как правило, происходит это, когда система реализована и уже значительное время работает), начинают с того, что предлагают следующее решение:

    1. Открываем транзакцию в БД.

    2. Изменяем запись в БД.

    3. Записываем в очередь.

    4. Закрываем транзакцию в БД.

    В этом случае, если одна из БД недоступна, то и в другую запись не запишется.

    Однако тут имеются следующие проблемы:

    • Данное решение условно работоспособно только в том случае, если обработчик очереди никогда не будет обращаться к записи в БД. Поскольку обработчик очереди может взять задачу на исполнение между пунктами 3 и 4 (Записи в БД ещё не зафиксированы).

    • Закрытие транзакции также может завершиться с ошибкой. В этом случае задание в очереди останется неотменённым.

    Выполнение всей работы в обработчике

    Простейшее решение.

    Вместо изменения записи кладём в очередь задание о том, что надо это сделать. Обработчик очереди выполняет оба действия: и изменения записи в БД, и триггерное. Это решение сложнее, чем первоначальное, но всё равно довольно простое.

    Проблема в том, что инициирующему действие коду может быть нужен результат работы (например, для того, чтобы вернуть http-код успеха/ошибки).

    Дополнительная таблица в БД

    Более простое решение, нежели двухфазный коммит.

    В игре у нас появляется дополнительный процесс/демон, перекладывающий задачи из дополнительной таблицы БД (назовём её queue) в очередь.

    В этом случае запросы, модифицирующие БД, пишут записи в две таблицы в той же транзакции (или в том же запросе):

    /* было */
    UPDATE
       "orders"
    SET
       "status" = 'complete'
    WHERE
       "order_id" = $1
    RETURNING
       *
    
    /* стало */
    WITH "o" AS (
        UPDATE
           "orders"
        SET
           "status" = 'complete'
        WHERE
           "order_id" = $1
        RETURNING
           *
    ),
    "q" AS (
       INSERT INTO
            "queue"
       (
          "key",
          "data"
       )
    
       SELECT
          "o.order_id",
          "o.status"
       FROM
          "o"
    )
    SELECT
       *
    FROM
       "o"
    

    Примечание: Код записи в таблицу queue можно вынести в триггер уровня БД, а также спроектировать так, чтобы сообщения были универсальными и подходили не только для orders.

    Если код, перекладывающий записи из таблицы queue в очередь, не использует двухфазный коммит, то к обработчику задания в очереди появляется требование идемпотентности. Это происходит потому, что при сбоях возможно появление дублей заданий в очереди.

    Простейший алгоритм цикла демона:

    • Взять задачу из таблицы queue.

    • Положить её в очередь.

    • Удалить задачу в таблице.

    Локальный лог/кеш сообщений

    Если потеря данных для нас только неприятна, но не критична, то в целом можно ограничиться первоначальным решением. Однако, если частота возникающих проблем всё-таки заставляет обратить на себя внимание (пользователи жалуются на периодически зависающий интерфейс, на неполучение СМС и т.п.), то можно применить следующий подход:

    • резко понизить вероятность неуспеха при отправке сообщения в очередь.

    Для этого, как в предыдущем варианте, применяем промежуточное кеширование записей сообщений для очереди, но пишем их не в дополнительную таблицу, а в локальный O_APPEND-файл или локально запущенную очередь.

    • сервис (наша программа) фиксирует изменение данных в БД,

    • затем сервис кладёт задание в локальный лог.

    • Дополнительный демон перекладывает сообщения из этого лога (или локальной очереди) в общую очередь.

    Запись в локальный файл коротких сообщений завершается ошибкой в крайне редких ситуациях, и поэтому можно считать, что проблема, возникающая крайне редко, есть решённая проблема.

    Итоги

    Как видим, вариантов решения проблемы немного. Если мы хотим сохранять систему простой (KISS-принцип), то введение дополнительного демона и кеша/лога сообщений в БД или локальном файле/БД даст небольшое увеличение сложности. При этом очень важно сохранять обработчик идемпотентным, поскольку при сбоях в момент перекладывания заданий из локального кеша в общую очередь возможно появление дублей.

    А обобщённое решение предполагает использование двухфазного коммита.

    Комментарии 6

      0
      В игре у нас появляется дополнительный процесс/демон, перекладывающий задачи из дополнительной таблицы БД (назовём её queue) в очередь.

      … и появляется соблазн отказаться от внешней очереди )

        0

        в масштабируемом, большом проекте — нет


        в таком проекте, очередь — это инструмент масштабирования, инкапсуляции итп

        0

        Когда один из компонентов системы недоступен, должна войти в игру логика попыток затриггерить обработку события ещё раз (если ошибка retryable) и/или отправка сообщения на dead letter queue. На DLQ тоже может висеть обработчик, который уведомит клиента нашего приложения, что транзакция не прошла и откатит выбранные изменения. Мне кажется, если уже идём в distributed решения, то и консистенция надо решать на стороне сервисов.

          0

          если например хост перегружается по какой-то причине, то любой процесс может быть убит в любой момент времени и данные потеряны.


          то есть логика установить триггер ещё раз может применяться, но код должен быть построен так, чтоб при ядерном взрыве транзакция откатилась.

          0
          идеи интересные и хочется более широкой картины на другие варианты, например, как насчет event sourcing?

          В этом случае обработчик, который отправляет СМС-уведомления, будет использовать тот же механизм, который использует хранение данных
            0

            это тот вариант что описан "перемещаем логику в обработчик очереди".
            увы тоже не всегда возможно.


            в общем случае триггер должен обработаться после изменения в БД (иначе и транзакционность не нужна по большому счёту)

          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

          Самое читаемое