Pull to refresh

Comments 43

Интересная статья, рно у меня есть несколько примечаний.

С терминами путаница.
- Очередь это буфер с политикой обработки FIFO
- Kafka, Rabbit MQ - это реализация архитектурного паттерна Message Broker, в котором FIFO это скорее второстепенная задача, частный случай, который может соблюдатся если есть только один потребитель на очередь.

Не существует никаких классических, стандартных очередей. Лучше не надо употреблять эти термины у них есть строго определённое значение. например, когда мне говорят стандартный что то там, я прошу ссылку на стандарт - люди недоумевают :)

Потом допишу остальное, сейчас надо уйти.

Вобщем приходилось решать схожую задачу, тоже использовали для хранения заданий PostgreSQL. Message Broker как хранилище заданий точно не подойдёт, из за наличия у заданий приоритетов и иных параметров по которым определяется приоритет заданий в моменте времени для отправки задания на исполнение.

В таких прикладных задачах, не смотря, что они выглядят простыми, есть много технических проблем, которых нам удалось избежать за счёт того, что у нас был строго один исполнитель для каждого типа задания, то есть соблюдалась политика FIFO, если бы появилась необходимость в увеличении мощности обработки заданий за счёт распарралеливания, то архитектура была бы сложнее и там определённо бы понадобился Message Broker, но именно в своём основном качестве, ни в коем случае не как хранилище.

Постараюсь написать обо всём этом статью.

Сообщение это просто кусок данных, который от кого-то куда-то передаётся. Таск это процесс, протяжённый во времени. Message Broker и Task Manager - принципиально разные вещи.

Кроме возможности тюнинга приоритетов у очередей в БД есть ещё две киллер-фичи:

  1. Транзакционная целостность. Задача попадает в очередь только при успехе всей транзакции и наоборот. С внешним MQ это трудно реализуемо.

  2. Возможность ставить задачи из хранимок и - особенно - триггеров. Последнее это просто серебряная пуля для крупной кодовой базы с залежами легаси. Если вам нужно запускать какую-то асинхронную обработку при любом добавлении/изменении строки в бд, например, генерить превьюшки картинок к загруженным файлам, то это чит и 10 минут работы вместо перелопачивания половины кодовой базы.

Транзакционная целостность. Задача попадает в очередь только при успехе всей транзакции и наоборот. С внешним MQ это трудно реализуемо.

А что, в нынешние времена нет чего-нибудь типа Microsoft Transaction Coordinator, чтобы гетерогенные транзакции реализовывать?

запускать какую-то асинхронную обработку при любом добавлении/изменении строки в бд, например, генерить превьюшки картинок к загруженным файлам

Не рекомендовал бы так делать. С триггерами не очевидна бизнес логика из кода. А еще проблемы с перформансом могут быть, если таблица активно изменяется.

Когда у вас 10-летний монолит на 1кк строк, там уже есть проблемы и с очевидностью бизнес-логики, и с перфомансом)

Транзакционная целостность. Задача попадает в очередь только при успехе всей транзакции и наоборот. С внешним MQ это трудно реализуемо.

Outbox producer решает

Outbox producer и есть очередь в базе для перекладывания во внешнюю

прекрасно, я тоже такую очередь хочу :-)
если ссылка на github ?

пример transactional outbox (это не задачи с приоритетами) https://github.com/Toloka/pg-queue-playground. Но есть примеры оптимизаций для обработчиков where id > last_processed и выключение ожидания синхронной реплики. А также ручной vacuum через truncate таблиц, чтобы долгие транзакции не мешали удалить bloat

Мне кажется это скорее какой-то pool tasks, чем очередь.

Более гибкий подход заключается во введении поля expected_status_change_time. В него записывается момент, до которого статус задачи должен измениться. ... Главное здесь — иметь надёжные тайм‑ауты и аккуратно настроенные запросы, чтобы не возникла ситуация, когда две копии одной задачи выполняются одновременно.

Как будто бы тут применим принцип не "если", а "когда" произойдет ситуация, при которой worker1 уже превысил expected_status_change_time и worker2 взял эту же задачу, хотя worker1 в состоянии выполнить ее успешно.

С идемпотентными задачами такой проблемы нет (кроме лишнего сжигания ресурсов на их выполнение). Но как в такой ситуации происходит резолв конфликтов в задачах с сайд-эффектами? worker1 при попытке завершить задачу дополнительно проверяет "не превышен ли лимит выполнения" или "принадлежит ли задача до сих пор мне"? Верно ли понимаю, что подобное решается только подбором нужного интервала выполнения для каждого отдельного типа задачи или есть какие-то механизмы, помогающие дополнительно снизить вероятность дублирования?

В голову приходит только идея с heartbeat механизмом, при котором worker1 дополнительно сообщает, что "он жив, задача в работе, требуется больше времени", но может есть что-то более хитрое.

SELECT FOR UPDATE создаём блокировку, что мешает автовакууму, что даёт bloat.

Я делал иначе, через логическую блокировку, добавлял два поля - гуид и дата, и по ним понимал заблочена запись или нет. Тупо условие в WHERE.

Рассматривали такой вариант?

Мало контекста своего решения вы дали. Но по тому , что дали - условие в where может сработать одновременно для двух транзакций, и обе транзакции следующей командой пойдут обновлять запись в БД. (Видимо, проставлять uuid и datetime?) Решает проблему только блокироаки

update where не должен сработать для двух паралельных запросов

Приветствую.
Делал похожее (пока забросил), напишу свои отличия, может пригодится кому:

- у меня воркеры не лазили в БД, а получали задачи от шедулера.
Шедулеров могло быть несколько, у каждого свои воркеры.
Воркер не имел доступа к БД, все обновления статусов задач - через шедулер.

- шедулеры занимались пуллингом задач из БД, но с помощью триггеров (на таблице задач висел триггер на появление новых записей).

- задача представляла из себя скрипт (на python либо bash), хранились скрипты задач тоже в БД, в таблице шаблонов

- воркер запускал для каждой задачи дочерний процесс, соотв-но все свои задачи выполнял параллельно.

- воркер ничего не знал конкретно о задаче, кроме скрипта и параметров, и не был как-то специализирован на конкретный вид задач.
Соответственно любой воркер мог взять любую задачу.
Результат и прогресс выполнения задачи воркер отправлял своему шедулеру.

- если воркер не отвечает, все задачи этого воркера будут перекинуты шедулером на другой воркер, без какого-либо участия БД в этом процессе.

Вот основные вроде отличия.

Выше много писали, что это скорее не очереди, но если хочется очередей в pg - посмотрите pgq

или в целом написать свой transactional outbox с аккуратным truncate таблиц или ротацией партиций против bloat при долгих транзакциях не должно быть сильно сложнее. Выше уже скидывал, как пример https://github.com/Toloka/pg-queue-playground

Моя любимая тема.

Очереди в БД + машина состояний = вполне рабочий паттерн.
SELECT FOR UPDATE SKIP LOCKED - достаточно тяжёлый подход. По сравнению с advisory locks.

Если в WHERE условие по одним столбцам по BETWEEN (schedule_time <= ?), а в ORDER BY по другим (priority DESC, COALESCE(delay_tolerance_time, schedule_time)), это не сможет безусловно работать быстро, какой бы индекс ни добавить. Но сможет работать молниеносно, если свести к одним и тем же столбцам, за счёт выборки уже отсортированных значений по единому индексу. Либо же придётся городить переход в промежуточные состояния и использовать частичные индексы по этим состояниям, что тоже неплохо бьёт по скорости.

Часто машину большого количества состояний партицируют по значениям состояний - чтобы более горячие для работы состояния оказывались рядом друг с другом в одной партиции с локальными индексами. Удобства в работе это сильно убавляет, но чего только не сделаешь ради производительности и экономии памяти.

Ура! Я вот думал писать про pg_advisory_xact_lock или кто-то найдётся :) Это реально тема!
Но нужно отметить, что писать с advisory lock гораздо сложенее чем с for update. В первую голову потому, что pg_advisory_xact_lock блокирует только 1 ключ (см документацию https://postgrespro.ru/docs/postgrespro/current/functions-admin). Они очень хорошо подходят для блокирования "сегментов", если Вы конечно их смогли разметить.

Один ключ может покрывать много строк; и можно сразу блокировать несколько строк - просто в advisory передавать колонку или синтетическое значение. Смысл же в том, что "данное значение заблокировано", а чему соответствует это значение - на усмотрение приложения. Потому и advisory

А почему вы использовали именно PostgreSQL?

Он же, из-за своей многоверсионной подсистемы хранения, плохо подходит для вашей задачи, в которой никакие старые версии строк не нужны?

плюсую к вопросу, понятно если надо было бы обеспечить транзакционость основной логики и запуск задачи, но раз для задач отдельная база, то может что-то из nosql лучше подошло бы

делал такое тоже но еще и с оркестрацией, чтобы можно было делать аккорды как в celery

Рекомендую заменить SELECT ... FOR UPDATE SKIP LOCKED на SELECT ... FOR NO KEY UPDATE SKIP LOCKED - эта блокировка более лёгкая для PG

а следующим шагом - на pg_advisory_xact_lock

Тут нет путаницы промеж брокеров сообщений и таск шедуллеров? Вторая статья от Яндекса и ни в одной нет упоминания про тот же river, который вполне работает на БД, шардируется, умеет в расписания и пр. Ну или airflow .. никто про готовые, в т.ч. опенсорс решения не в курсе в Яндексе? Как-то даже странно, или чего-то не так понял?

Несрочный пуш, например уведомление о новой подборке, — если он задержится на 30 минут, пользователь всё равно будет доволен

А если не придет никогда, то будет еще более доволен!

А можете рассказать как обеспечиваете атомарность между изменениями в основной базе и запуском соответсвующих задач в другой?

Следующий важный момент — Bloat в PostgreSQL. К сожалению, здесь нет волшебного решения: помогает только агрессивный VACUUM и ANALYZE

detach партиций или truncate с копирование живых данных во временную табличку помогут даже в случае долгих транзкций, так как это не MVCC safe операции

А что, если между БД и воркерами добавить в схему очередь в Message Broker и сервис (назовём его шедулером), который будет выбирать из БД самые приоритетные и срочные задачи и помещать их в Message Broker? Метаданные по всем задачам по-прежнему будут храниться в БД. А воркеры будут брать по одной задаче из Message Broker и сразу выполнять её, вместо того чтобы обращаться в БД и кэшировать пачку задач локально.

Из плюсов:

  • Снижается нагрузка на БД, так как шедулеров будет значительно меньше, чем воркеров, и они смогут выбирать задачи гораздо большими батчами.

  • Воркеры станут проще: сейчас они совмещают ответственность за выполнение задач, их выбор и кэширование в локальной очереди.

  • Приоритетные задачи будут выполняться быстрее. Сейчас важная задача может лежать в локальной очереди и ждать, пока освободится конкретно этот воркер, хотя в это время может быть много других свободных воркеров, которые могли бы взять эту задачу, если бы очередь была общей.

такой баянище.. конечным автоматам сто лет в обед уже. а аы всё "открытия" делаете.

Следующий важный момент — Bloat в PostgreSQL. К сожалению, здесь нет волшебного решения: помогает только агрессивный VACUUM и ANALYZE

Если партиционирование сделать не по хэшу а по интервалам (range, для этого можно ввести синтетический ключь bigint например) то партиции с отработанными событиями можно просто отсоединять (detach/drop).
А если данные (payload) хранить в другой табличке, где строки (статусы) не меняются, то и проблемы с Bloat сократятся на порядок.

Есть вопрос про delay tolerance: какая логика используется при обработке значения времени, заданного в нём?

В статье приведён пример: задача со значением 1 секунда будет обработана раньше задачи со значением 10 секунд. А изменится ли этот алгоритм, когда со времени schedule time второй задачи уже прошло 10 секунд? Или 9 секунд? Если да, то как устроен отбор с сортировкой по приоритету при получении первой задачи в очереди?Если нет - то зачем там отсылка к времени, если фактически это просто приоритет с неограниченным количеством градаций? Возможно для удобства приоритизации задач разных типов?

Спасибо за материал, очень хорошо изложен.

Плюсую, такой же вопрос возник при прочтении.

Sign up to leave a comment.

Information

Website
www.ya.ru
Registered
Founded
Employees
over 10,000 employees
Location
Россия