Pull to refresh

Celery: лучшие практики

Reading time5 min
Views148K
Original author: Deni Bertovic
Если вы работаете с Django, то на некотором этапе разработке вам может понадобиться фоновая обработка долго выполняющихся задач. Возможно, что для такого рода задач вы используете какой-либо инструмент для управления очередями задач. Celery — один из самых популярных проектов для решения подобных задач в мире python и Django на данный момент, но есть и другие проекты для этой цели.

Пока я работал над некоторыми проектами, использующими Celery для управления очередями задач, выявились некоторые лучшие практики, которые я решил задокументировать. Впрочем это громкие слова для того, что я думаю о правильном подходе к решению подобных задач, а также о некоторых недостаточно используемых возможностях, которые предлагает сообщество проекта Celery.

No.1: Не используйте СУБД как ваш AMQP брокер


Позвольте мне объяснить почему я считаю это неправильным (помимо тех ограничений что описаны в документации Celery).

СУБД не разрабатывались для тех задач, которые выполняют полноценный AMQP брокер такой как RabbitMQ. Она упадет в «боевых» условиях даже на проекте с не очень большим трафиком\пользовательской базой.

Я предполагаю, что самой популярной причиной того почему люди решают использовать СУБД в том что, как правило, у них уже есть одна СУБД для веб-приложения, так почему бы не воспользоваться ей еще раз. Начать работать с таким вариантом несложно и не надо беспокоиться о других компонентах (таких как RabbitMQ).

Предположим не такой уж гипотетический сценарий: у вас есть 4 фоновых воркера для обработки, которые вы помещаете в базу данных. Это значит что вы получаете 4 процесса, которые достаточно часто запрашивают базу о новых задачах, не говоря уже о том, что каждый из них может иметь собственные конкурирующие потоки. В некоторый момент времени вы понимаете, что растет задержка при обработке задач, а потому приходит больше новых задач чем завершается, необходимо увеличивать количество воркеров. Вдруг скорость вашей базы данных начинает «проседать» из-за огромного количества запросов воркеров к базе, дисковый ввод\вывод превышает заданные лимиты, а все это начинает влиять на ваше приложение, так как воркеры, фактически, устроили DDOS-атаку вашей базе.

Этого не произошло бы при использовании полноценного AMQP брокера, так как очередь размещается в памяти и таким образом устраняется высокая нагрузка на жесткий диск. Потребителям (воркерам) нет необходимости часто запрашивать информацию, так как очередь имеет механизм доставки новой задачи воркеру, и даже, если AMQP брокер будет перегружен по каким-либо иным причинам это не приведет к падению и тормозам того веб-приложения, которое взаимодействует с пользователем.

Я пойду еще дальше и скажу, что вы не должны использовать СУБД как брокера даже в процессе разработки, тогда когда есть такие вещи как Docker и множество преднастроенных образов, которые предоставляют настроенный RabbitMQ «из коробки».

No.2: Используйте больше очередей (т.е. не только одну, которая дается по умолчанию)


Celery очень легко начать использовать, и она предоставляет сразу же одну очередь по умолчанию, в которую и помещаются все задачи пока не будет явно предписано другое поведение Celery. Наиболее общий пример того, что вы можете увидеть:

@app.task()
def my_taskA(a, b, c):
    print("doing something here...")

@app.task()
def my_taskB(x, y):
    print("doing something here...")


Что происходит, если обе задачи будут размещены в одной очереди, если иное не определено в файле celeryconfig.py. Я полностью пониманию чем может оправдывать подобный подход, у вас есть один декоратор, который создает удобные фоновые задачи. Здесь я хотел бы обратить внимание, что taskA и taskB, находясь в одной очереди могут делать совершенно разные вещи и таким образом одна из них может быть куда важнее другой, так почему они находятся все в одной корзине? Даже, если у вас один воркер, то представьте такую ситуацию что менее важная задача taskB окажется настолько массовой, что более важной задаче taskA воркер не сможет уделить необходимого внимания.Это приводит нас к к следующему пункту.

No.3: Используйте приоритеты воркеров


Путем решения проблемы, указанной выше является размещение задачи taskA в одной очереди, а taskB в другой и после этого присвоить x воркеров обработке очередь Q1, а остальных на обработку Q2, так как в нее приходит больше задач. Таким образом вы можете быть уверены, что задача taskB получит достаточно воркеров, а остальные тем временем будут обрабатывать менее приоритетную задачу, когда она придет, не провоцируя длительного ожидания и обработки. Потому, определите ваши очереди сами:

CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'),
    Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'),
)

И ваши роутеры, которые определять куда направлять задачу:
CELERY_ROUTES = {
    'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_A'},
    'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'},
}


Это позволит выполнять воркеры для каждой задачи:
celery worker -E -l INFO -n workerA -Q for_task_A
celery worker -E -l INFO -n workerB -Q for_task_B

No.4: используйте механизмы Celery для обработки ошибок


Большинство задач, которые я видел не имеют механизмов обработки ошибок. Если в задаче произошла ошибка, то она просто падает. Это может быть удобно для некоторых задач, однако большинство задач, которые я видел взаимодействовали с внешними API и падали из-за некоторых видов сетевых ошибок или иных проблем «доступности ресурса». Самый простой подход к обработке таких ошибок перевыполнить код задачи, так как, возможно, проблемы взаимодействия с внешним API были уже устранены.

@app.task(bind=True, default_retry_delay=300, max_retries=5)
def my_task_A():
    try:
        print("doing stuff here...")
    except SomeNetworkException as e:
        print("maybe do some clenup here....")
        self.retry(e)

Я люблю определять по умолчанию для задачи время ожидания, которое она будет ждать прежде чем попытается выполниться снова и как много попыток перевыполнения она предпримет прежде чем окончательно выбросить ошибку(параметры default_retry_delay и max_retries соответственно). Это наиболее простая форма обработки ошибок, которую я могу представить, но я видел, что и она практически не применяется. Разумеется Celery имеет и более сложные методы обработки ошибок, они описаны в документации Celery.

No.5: используйте Flower


Flower — прекрасный инструмент для отслеживания состояния ваших задач и воркеров Celery. У инструмента есть веб-интерфейс и он позволяет такие вещи как:
  • прогресс задач
  • детали выполнения
  • статус воркеров
  • запускать новые воркеры

Полный список возможностей вы можете увидеть по приведенной ссылке.

No.6: Отслеживайте статус задачи, только если вам это необходимо


Статус задачи это информация о том успешно или нет завершилась задача. Она может быть полезна для некоторых статистических показателей. Важная вещь, которую следует понимать в данном случае: статус задачи это не результирующие данные и той работы, которая она выполняла, такая информация наиболее похожа на неявные изменения, записываемые в базу данных (такие, например, как изменения списка друзей пользователя).

В большинстве проектов, которые я видел реально не заботились о данных по статусу задачи после ее завершения, используя базу данных sqlite, которую предлагается по умолчанию или лучше того тратили время на использование больших СУБД типа PostgreSQL. Зачем просто так нагружать базу данных своего приложения? Используйте CELERY_IGNORE_RESULT = True в вашем файле настроек celeryconfig.py и отбрасывайте такие данные.

No.7: не передавайте объекты базы данных\ORM в задачу


После обсуждения вышеизложенного на встречах локальных групп python разработчиков некоторые люди предложили включить дополнительный пункт в представленный список. О чем он? Вы не должны передавать объекты базы данных, например, модель пользователя в фоновую задачу, так как в сериализованном объекте могут оказаться уже устаревшие и некорректные данные. Если вам необходимо, то передавайте в задачу ID пользователя, а в самой задаче запрашивайте базу об этом пользователе.
Tags:
Hubs:
+23
Comments29

Articles