Лучший способ что-то понять - попробовать на практике, а лучшая практика - это реальные примеры. В этой статье мы разберем возможности Celery и применение основных параметров для управления задачами. Для начала пару слов про сам Celery.
Зачем же нам Celery?
Celery - это самый популярный инструмент для асинхронной обработки задач. Он позволяет выполнять задачи в фоновом режиме и гибко настраивать параметры для каждой задачи. Кроме того, Celery предоставляет возможности для планирования, работы с разными очередями и мониторинга выполнения задач.
А теперь рассмотрим шесть базовых сценариев использования.
Сценарий 1: Выполнение задачи в фоне
Начнем с самого простого - обычное выполнение задачи в фоновом режиме. Представим, что нам нужно сгенерировать тяжелый отчет по запросу и синхронное выполнение здесь не подходит (слишком долго). Поэтому мы отправляем запрос на сервер для его генерации, после чего задача встает в очередь. Когда очередь дойдет до нашей задачи - начнется генерация отчета. Теперь перейдем к реализации.
Создание асинхронной задачи начинается с определения функции, которая будет выполняться асинхронно. Для этого используем декоратор @task
import time
from celery import Celery
app = Celery('myapp', broker='pyamqp://guest@localhost//')
@app.task
def generate_report_task(arg1, arg2):
print("Start generating report")
time.sleep(10)
print("Report generated")
Осталось только запустить. Есть три способа это сделать: apply_async
, delay
и обычный вызов call
.
apply_async
- это метод, который предоставляет максимальную гибкость при запуске задачи и принимает большое количество аргументов.
generate_report_task.apply_async(args=[arg1_value], kwargs={'key': 'value'})
delay
- в отличие от apply_async
имеет ограниченный список принимаемых аргументов. Такой способ запуска мы рассматриваем, когда нужно просто запустить задачу без необходимости передавать именованные аргументы и другие параметры.
generate_report_task.delay(arg1_value, arg2_value)
Этот метод часто используется, когда задача принимает всего несколько аргументов и нам нужно просто её запустить.
Последний способ - это обычный вызов функции. В таком случае задача будет выполнена сразу же, а не назначена в очередь.
generate_report_task(arg1_value, arg2_value)
Сценарий 2: Выполнить задачу через час
Следующая задача - пользователь создал статью и хочет опубликовать её через один час. Настало время узнать об аргументах, которые принимает apply_async.
Вариантов здесь - два. Самый простой - аргумент countdown
- в переводе "обратный отсчёт". Он позволяет задать время в секундах, через которое задача станет доступна для выполнения. Как раз то, что нам нужно!
from datetime import datetime
@app.task
def publish_article(arg1, arg2):
print(f"Publish time: {datetime.now()}")
publish_article_after = 60 * 60 # 60 минут
result = publish_article.apply_async(args=[article_id], countdown=publish_article_after)
Важно для Redis Backend
Данный способ не подойдет, если вы используете Redis в качестве брокера. Дело в том, что Redis помещает отложенные задачи в очередьunacked
, из которой по истечение времени, указанного в аргументе VISIBILITY_TIMEOUT
, задача будет назначена еще одному обработчику. Например, countdown у нас равен 120 минутам, а VISIBILITY_TIMEOUT
по умолчанию 60. В таком случае есть риск, что задача будет назначена сразу трём обработчикам (первому сразу, второму через 60 минут, третьему - если задача через 120 минут будет еще в очереди). В результате, мы получим выполнение одной и той же задачи несколько раз. Подробнее в документации тут и тут.
Важно для RabbitMQ Backend
Параметрconsumer_timeout
по умолчанию равен 30 минутам. Не желательно устанавливать countdown больше этого времени, иначе будет возбуждено исключение PRECONDITION_FAILED
. Если есть такая необходимость, необходимо увеличить время в rabbitmq.conf. Подробнее - тут.
Сценарий 3: Выполнить задачу завтра в полдень
Теперь наш пользователь хочет выложить статью завтра в полдень. Эта ситуация очень похожа на предыдущую и мы могли бы использовать countdown
. Но он лучше подходит для небольших промежутков времени - через минуту или пол часа. А для назначения задачи на конкретное время намного удобнее использовать аргумент eta
. Он расшифровывается как Estimated Time of Arrival, что в переводе "Ожидаемое время прибытия".
Здесь есть две важных детали:
при использовании Redis отложенные с помощью
eta
задачи столкнутся с той же проблемой, что иcountdown
из-заVISIBILITY_TIMEOUT
.eta
- это не точное время, в которое будет выполнена задача. Указывая время, мы говорим Celery - "задача должна быть выполнена не раньше этого времени". Как только это время наступит - задача будет выполнена в порядке очереди и будет зависеть от количества задач в очереди.
Вот пример:
from datetime import datetime
# Получим время для примера. В нормальной ситуации -
# нам придет аргумент с временем публикации
now = datetime.now()
tomorrow = now + timedelta(days=1)
publish_article_datetime = datetime(tomorrow.year, tomorrow.month, tomorrow.day, 12, 0, 0)
result = publish_article.apply_async(args=["some_value"], eta=publish_article_datetime)
Сценарий 4: Выставить максимальное время выполнения задачи
Разберем на примере задачи по генерации отчёта. Мы знаем, что она не должна занимать больше часа (для примера). Если такое вдруг случилось - скорее всего что-то не так. Необходимо завершить задачу по таймауту, а потом - разобраться в причинах.
Для этого мы будем использовать аргументы soft_time_limit
и time_limit
. После наступления soft_time_limit
в задаче будет возбуждено исключение SoftTimeLimitExceeded
. Если задача не завершилась и наступает time_limit
, выполнение задачи будет приостановлено.
Первый вариант - передать аргументы в apply_async
@app.task()
def generate_report():
try:
time.sleep(60 * 2)
except SoftTimeLimitExceeded:
print("Soft time limit exception")
time.sleep(60 * 3)
soft_time_limit = 60 * 1
hard_time_limit = 60 * 2
result = my_task.apply_async(args=[some_value], soft_time_limit=soft_time_limit, time_limit=hard_time_limit )
Второй вариант - сразу указать ограничения в аргументах декоратора.
@app.task(time_limit=60 * 60, soft_time_limit=59 * 60) # 60/59 min
def generate_report():
try:
time.sleep(60 * 2)
except SoftTimeLimitExceeded:
print("Soft time limit exception")
time.sleep(60 * 3)
result = my_task.apply_async(args=[some_value])
В результате, после запуска задачи через одну минуту мы увидим в консоли "Soft time limit exception", а еще через минуту задача будет принудительно завершена.
Сценарий 5: Отмена выполнения задачи по истечение времени
Возможна ситуация, когда определенная задача теряет свою актуальность, если не выполнена в течение какого то времени. Снова рассмотрим на примере генерации отчёта. Пользователь отправил запрос на генерацию отчёта. Задача попала в очередь и за час ни один обработчик не смог её обработать. В таком случае нам нужно отменить "просроченную" задачу. Для этого применяется аргументexpires
. Он принимает либо число в секундах, либо объект datetime.
Например:
# Генерация будет отменена через час
generate_report.apply_async((10, 10), expires=3600)
# Генерация будет отменена, если через день задача не начнет выполняться
from datetime import datetime, timedelta, timezone
generate_report.apply_async((10, 10), kwargs,
expires=datetime.now(timezone.utc) + timedelta(days=1))
Сценарий 6: Повторное выполнение задачи при возникновении ошибки
По какой-то причине в задаче может возникнуть ошибка. В таком случае нам может понадобится механизм повторного выполнения (retry).
Для начала разберем аргументы, которые помогут нам с детальной настройкой. Декоратор @app.task
принимает:
default_retry_delay
[int]: время до следующей попытки в секундах.max_retries
[int]: максимальное количество попыток.autoretry_for
[list | tuple]: Принимает список или кортеж с исключениями. Автоматический повтор при возникновении ошибки из переданного списка.retry_backoff
[bool|int]:при включении, задержка будет расти экспотенциально. Первая повторная попытка будет иметь задержку 1 секунду, вторая повторная попытка будет иметь задержку 2 секунды, третья будет иметь задержку в 4 секунды, четвертая будет иметь задержку в 8 секунд,retry_backoff_max
[int]: устанавливает максимальную задержку в секундах. Рекомендуется использовать всегда при использованииretry_backoff
, чтобы избежать слишком больших задержек.retry_jitter
[bool]: задает случайную задержку. Принцип расчетаnew_num_of_seconds = random.randrange(retry_backoff + 1)
. Соответственно время задержки будет случайным числом от 0 доretry_backoff
Теперь перейдем к коду. Есть два основных способа задействовать механизм retry.
Первый - использовать метод .retry()
. Мы можем вызывать его по какому-либо условию.
@app.task(default_retry_delay=30, max_retries=3) # 60/59 min
def generate_report():
some_condition = some_logic()
if some_condition:
generate_report.retry()
result = my_task.apply_async(args=[some_value])
Второй способ - это передать список ошибок при которых нужно выполнить задачу повторно.
@celery_app.task(autoretry_for=(GenerateReportError, SaveReportError, ), default_retry_delay=30, max_retries=5)
def generate_report():
...
Заключение
Отлично! Мы с вами рассмотрели шесть основных сценариев. Надеюсь, что это послужит крепкой основой для ваших будущих задач и дальнейшего погружения в Celery.
Стоит отметить, что в этой статье мы не касались вопросов, связанных с периодическими задачами и использованием celery beat
. Это широкая тема, которая заслуживает отдельной статьи, и я надеюсь подробно рассмотреть ее в будущем материале.
P.S Я веду свой авторский канал о разработке в Telegram. Публикую выжимки с конференций, делюсь рабочей рутиной и полезными идеями/мыслями.