Как стать автором
Обновить

3 кейса для использования Celery в Django-приложении

Время на прочтение4 мин
Количество просмотров32K
image

Я занимаюсь созданием веб-приложений на Django. В основном, это SaaS сервисы для бизнеса. Во всех этих приложениях есть необходимость в асинхронных задачах. Для их реализации использую Celery. В статье расскажу о ситуациях, в которых применяю Celery, с примерами кода.

Celery – это система для управления очередями задач. Принципиально умеет 2 вещи: брать задачи из очереди и выполнять задачи по расписанию. В качестве брокера очередей обычно используются RabbitMQ или Redis. В очереди кладутся задачи, а потом воркеры Celery берут их оттуда и выполняют.

Для Celery можно придумать применение почти в любом приложении, но далее я опишу только те кейсы, в которых использую его сам.

1. Задачи по расписанию


Часто есть задачи, которые нужно выполнить в определенную дату и время: отправить пользователю напоминание, закончить пробный период аккаунта, опубликовать пост в соцсетях.

В Celery есть возможность при вызове таска указать параметр ETA – время, в которое надо запустить таск. Но если так планировать задачи, то получается очень ненадежно: они могут не запуститься и их неудобно отменять.

Более надежный способ – это использовать celerybeat schedule. То есть создать расписание, где будут таски, которые запускаются с определенной периодичностью или в определенное время. Например, если необходимо опубликовать пост в соцсетях по расписанию, то таск для этого запускается раз в минуту. Если надо закончить пробный период у аккаунта, то можно запускать таск раз в сутки.

# schedule.py
from datetime import timedelta
from celery.schedules import crontab


CELERYBEAT_SCHEDULE = {
   'publish_post_starter': {
       'task': 'publish_post_starter',
       'schedule': timedelta(minutes=1),
   },
   'end_trial_starter': {
     'task': 'end_trial_starter',
     'schedule': crontab(hour=10, minute=21),
  },
}

В таске стартере получаем все инстансы, у которых запланированное время уже наступило. Проходимся по инстансам и для каждого вызываем основной таск. В качестве аргументов передаем только id инстанса, чтобы не засорять очередь ненужными данными. Можем сразу пройтись по всем инстансам и выполнить действия, но чаще всего лучше вызвать отдельный таск для каждого инстанса. Так мы ускорим выполнение, и, если произойдет ошибка, то она повлияет только на один из тасков.

# tasks.py
@app.task(name='publish_post')
def publish_post(post_id):
   ...

@app.task(name='publish_post_starter')
def publish_post_starter():
   post_ids = list(
       Post.objects.filter(
           publish_dt__lte=timezone.now(),
           is_published=False
       ).values_list('id', flat=True)
   )

   for post_id in post_ids:
       publish_post.delay(post_id)

2. Долгие вычисления и вызовы API из WSGI


Под WSGI подразумевается контекст, в котором обрабатываются запросы от пользователей (Request-Response Cycle). В противовес контексту асинхронных задач – Celery.

Для создания отзывчивого интерфейса все кнопки должны реагировать мгновенно и не должны блокировать остальной интерфейс. Для этого после нажатия кнопка блокируется, на нее ставится спиннер и отправляется ajax-запрос на сервер. Если обработка запроса занимает дольше пары секунд, то можно переместить вычисления в Celery-таск.

В WSGI вызываем таск и возвращаем ответ. На фронте разблокируем кнопку и убираем спиннер. Пользователю показываем сообщение, что действие запущено. Параллельно выполняется Celery-таск, который по завершению возвращает ответ по вебсокету. Получив результат на фронте, показываем его пользователю.

# rest_views.py
from rest_framework import status
from rest_framework.views import APIView
from rest_framework.response import Response

from tasks import send_emails


class SendEmailView(APIView):
   def post(self, request):
       # this id will be used to send response with websocket
       request_uuid = request.data.get('request_uuid')
       if not request_uuid:
           return Response(status=status.HTTP_400_BAD_REQUEST)
      
       send_emails.delay(request.user.id, request_uuid)
       return Response(status=status.HTTP_200_OK)

Отдельно можно выделить вызовы внешнего API из WSGI. В данном случае все вызовы, независимо от длительности их выполнения, запускаются через Celery-таск. Это защита от дурака. Не должно быть ситуации, когда из-за недоступности какого-то внешнего API подвисает интерфейс у пользователя.

3. Вызовы из Tornado


При интеграции с соцсетью, Telegram или платежным сервисом нужен webhook-урл, на который буду приходить оповещения. Количество запросов не всегда можно рассчитать заранее, но скорее всего их количество будет превышать запросы от пользователей. Эти запросы буду приходить до того момента, как получат ответ с кодом 200.

Для обработки таких запросов подходит асинхронный фреймворк Tornado. Чтобы не превращать обработку в синхронную в Tornado не должно быть блокирующих операций. Тут и нужен Celery. Tornado handler получает запрос, валидирует данные, вызывает Celery-таск и возвращает успешный ответ.

# tornado_handlers.py
from tornado import gen, escape
from tornado.web import RequestHandler

from tasks import handle_vk_callback


class VkCallbackHandler(RequestHandler):
   @gen.coroutine
   def post(self, *args, **kwargs):
       try:
           data = escape.json_decode(self.request.body)
       except ValueError:
           self.set_status(status_code=400, reason='Invalid data')
           return

       handle_vk_callback.delay(data)
       self.write('ok')
       return
Теги:
Хабы:
Всего голосов 16: ↑14 и ↓2+12
Комментарии12

Публикации

Истории

Работа

Python разработчик
136 вакансий

Ближайшие события