Как стать автором
Обновить

Рецепты PostgreSQL: планировщик асинхронных задач

Время на прочтение3 мин
Количество просмотров10K
Для приготовления планировщика асинхронных задач нам понадобится сам postgres и его расширение pg_task. (Я дал ссылки на свой форк postgres, т.к. делал некоторые изменения, которые пока не удалось пропихнуть в оригинальный репозиторий. Можно также воспользоваться готовым образом.)

(В оригинальном PostgreSQL есть ошибка в PL/pgSQL, из-за которой мой планировщик некорректно работает, когда в задаче, написанной на PL/pgSQL, возникает неперехваченное исключение. Я описал эту ошибку здесь и исправил у себя в форке тут.)

Для установки планировщика не требуется создавать расширение в (каждой) базе данных. Вместо этого просто добавляем его в конфигурационный файл

shared_preload_libraries = 'pg_task'

После перезапуска PostgreSQL планировщик создаст во всех базах данных таблицы task от имени пользователей баз данных и в схемах по-умолчанию для этих пользователей.

Если нужно, чтобы планировщик запускался только для определённых баз данных, то можно задать их в конфигурационном файле

pg_task.database = 'database1,database2'

Если же требуется запускать планировщик не от пользователей баз данных, то это тоже можно задать так

pg_task.database = 'database1:user1,database2:user2'

Если нужно создавать таблицы планировщика не в схеме по-умолчанию для пользователей, то можно задать это так

pg_task_schema.database1 = schema3

Если же требуется и таблицу планировщика назвать по-другому, то это можно сделать так

pg_task_table.database1 = table3

По умолчанию планировщик проверяет задачи каждые 1000 мс, но это можно изменить так

pg_task_period.database1 = 100
pg_task_period.database2 = 10

Итак, планировщик создает (если ещё не создано) (схему, если нужно, и) таблицу задач с такими колонками

id BIGSERIAL NOT NULL PRIMARY KEY, -- идентификатор, первичный ключ
dt TIMESTAMP NOT NULL DEFAULT NOW(), -- планируемое время запуска задачи (по-умолчанию - как можно быстрее)
start TIMESTAMP, -- фактическое время запуска задачи
stop TIMESTAMP, -- фактическое время окончания выполнения задачи
queue TEXT NOT NULL DEFAULT 'default', -- очередь выполнения задачи (нужна для ограничения количества одновременно выполняемых задач)
max INT, -- максимальное количество одновременно выполняемых задач в очереди (также, позволяет задавать приоритет выполнения конкретной задачи)
pid INT, -- идентификатор процесса, который выполняет задачу
request TEXT NOT NULL, -- собственно SQL текст задачи
response TEXT, -- результат выполнения задачи
state TEXT NOT NULL DEFAULT 'QUEUE', -- состояние задачи (по-умолчанию - запланирована, также может быть выполнена, остановлена, ...)
timeout INTERVAL, -- позволяет ограничивать время выполнения задачи
delete BOOLEAN NOT NULL DEFAULT false, -- удалять задачу автоматически после выполнения, если нет результата
repeat INTERVAL, -- интервал повторения задачи
drift BOOLEAN NOT NULL DEFAULT true -- предотвращать ли дрейф времени при повторе задачи

На самом деле, планировщик запускает несколько фоновых рабочих процессов: один для отслеживания изменений в конфигурационном файле и запуске/остановке при необходимости остальных фоновых процессов планировщика. И по одному фоновому рабочему процессу для каждой базы для проверки запланированных задач в каждой базе и запуске при необходимости выполнения задач.

Например, если мы хотим выполнить задачу как можно быстрее, то выполняем SQL команду

INSERT INTO task (request) VALUES ('SELECT now()')

Результат выполнения задачи планировщик запишет в колонку результата в текстовом виде. Если в результате выполнения задачи будет несколько колонок, то планировщик добавит их в шапку вместе с типами колонок. Также, в результате выполнения задачи может быть и несколько строк, все они добавятся в колонку результата.

Если мы хотим выполнить задачу, например, через 5 минут, то запишем планируемое время в соответствующую колонку

INSERT INTO task (dt, request) VALUES (now() + '5 min':INTERVAL, 'SELECT now()')

а если мы хотим, чтобы задача выполнилась в конкретное время, то так и запишем

INSERT INTO task (dt, request) VALUES ('2019-07-01 00:00:00', 'SELECT now()')

Если нам требуется, чтобы задача выполнялась каждые 5 минут, то пишем так

INSERT INTO task (repeat, request) VALUES ('5 min', 'SELECT now()')

если же написать так

INSERT INTO task (repeat, request, drift) VALUES ('5 min', 'SELECT now()', false)

то повторение задачи будет запускаться через 5 минут после окончания выполнения задачи (а не после запланированного времени, как по умолчанию).

При возникновении исключения при выполнении задачи, оно перехватывается и в текстовом виде записывается в колонку результата, а задаче присваивается соответствующее состояние.

Например

INSERT INTO task (request) VALUES ('SELECT 1/0')

Если нужно, чтобы для какой-то очереди задач одновременно выполнялось не более 2х задач, то создаём задачи командой

INSERT INTO task (queue, max, request) VALUES ('queue', 2, 'SELECT now()')

Пусть у нас в этой очереди накопилось очень много задач и они одновременно выполняются по 2. Если создать задачу командой

INSERT INTO task (queue, max, request) VALUES ('queue', 3, 'SELECT now()')

то она выполнится как можно раньше всех остальных задач в этой очереди, т.е. это ещё что-то типа приоритета
Теги:
Хабы:
Всего голосов 15: ↑14 и ↓1+13
Комментарии14

Публикации

Истории

Ближайшие события