image

Я занимаюсь созданием веб-приложений на Django. В основном, это SaaS сервисы для бизнеса. Во всех этих приложениях есть необходимость в асинхронных задачах. Для их реализации использую Celery. В статье расскажу о ситуациях, в которых применяю Celery, с примерами кода.

Celery – это система для управления очередями задач. Принципиально умеет 2 вещи: брать задачи из очереди и выполнять задачи по расписанию. В качестве брокера очередей обычно используются RabbitMQ или Redis. В очереди кладутся задачи, а потом воркеры Celery берут их оттуда и выполняют.

Для Celery можно придумать применение почти в любом приложении, но далее я опишу только те кейсы, в которых использую его сам.

1. Задачи по расписанию


Часто есть задачи, которые нужно выполнить в определенную дату и время: отправить пользователю напоминание, закончить пробный период аккаунта, опубликовать пост в соцсетях.

В Celery есть возможность при вызове таска указать параметр ETA – время, в которое надо запустить таск. Но если так планировать задачи, то получается очень ненадежно: они могут не запуститься и их неудобно отменять.

Более надежный способ – это использовать celerybeat schedule. То есть создать расписание, где будут таски, которые запускаются с определенной периодичностью или в определенное время. Например, если необходимо опубликовать пост в соцсетях по расписанию, то таск для этого запускается раз в минуту. Если надо закончить пробный период у аккаунта, то можно запускать таск раз в сутки.

# schedule.py
from datetime import timedelta
from celery.schedules import crontab


CELERYBEAT_SCHEDULE = {
   'publish_post_starter': {
       'task': 'publish_post_starter',
       'schedule': timedelta(minutes=1),
   },
   'end_trial_starter': {
     'task': 'end_trial_starter',
     'schedule': crontab(hour=10, minute=21),
  },
}

В таске стартере получаем все инстансы, у которых запланированное время уже наступило. Проходимся по инстансам и для каждого вызываем основной таск. В качестве аргументов передаем только id инстанса, чтобы не засорять очередь ненужными данными. Можем сразу пройтись по всем инстансам и выполнить действия, но чаще всего лучше вызвать отдельный таск для каждого инстанса. Так мы ускорим выполнение, и, если произойдет ошибка, то она повлияет только на один из тасков.

# tasks.py
@app.task(name='publish_post')
def publish_post(post_id):
   ...

@app.task(name='publish_post_starter')
def publish_post_starter():
   post_ids = list(
       Post.objects.filter(
           publish_dt__lte=timezone.now(),
           is_published=False
       ).values_list('id', flat=True)
   )

   for post_id in post_ids:
       publish_post.delay(post_id)

2. Долгие вычисления и вызовы API из WSGI


Под WSGI подразумевается контекст, в котором обрабатываются запросы от пользователей (Request-Response Cycle). В противовес контексту асинхронных задач – Celery.

Для создания отзывчивого интерфейса все кнопки должны реагировать мгновенно и не должны блокировать остальной интерфейс. Для этого после нажатия кнопка блокируется, на нее ставится спиннер и отправляется ajax-запрос на сервер. Если обработка запроса занимает дольше пары секунд, то можно переместить вычисления в Celery-таск.

В WSGI вызываем таск и возвращаем ответ. На фронте разблокируем кнопку и убираем спиннер. Пользователю показываем сообщение, что действие запущено. Параллельно выполняется Celery-таск, который по завершению возвращает ответ по вебсокету. Получив результат на фронте, показываем его пользователю.

# rest_views.py
from rest_framework import status
from rest_framework.views import APIView
from rest_framework.response import Response

from tasks import send_emails


class SendEmailView(APIView):
   def post(self, request):
       # this id will be used to send response with websocket
       request_uuid = request.data.get('request_uuid')
       if not request_uuid:
           return Response(status=status.HTTP_400_BAD_REQUEST)
      
       send_emails.delay(request.user.id, request_uuid)
       return Response(status=status.HTTP_200_OK)

Отдельно можно выделить вызовы внешнего API из WSGI. В данном случае все вызовы, независимо от длительности их выполнения, запускаются через Celery-таск. Это защита от дурака. Не должно быть ситуации, когда из-за недоступности какого-то внешнего API подвисает интерфейс у пользователя.

3. Вызовы из Tornado


При интеграции с соцсетью, Telegram или платежным сервисом нужен webhook-урл, на который буду приходить оповещения. Количество запро��ов не всегда можно рассчитать заранее, но скорее всего их количество будет превышать запросы от пользователей. Эти запросы буду приходить до того момента, как получат ответ с кодом 200.

Для обработки таких запросов подходит асинхронный фреймворк Tornado. Чтобы не превращать обработку в синхронную в Tornado не должно быть блокирующих операций. Тут и нужен Celery. Tornado handler получает запрос, валидирует данные, вызывает Celery-таск и возвращает успешный ответ.

# tornado_handlers.py
from tornado import gen, escape
from tornado.web import RequestHandler

from tasks import handle_vk_callback


class VkCallbackHandler(RequestHandler):
   @gen.coroutine
   def post(self, *args, **kwargs):
       try:
           data = escape.json_decode(self.request.body)
       except ValueError:
           self.set_status(status_code=400, reason='Invalid data')
           return

       handle_vk_callback.delay(data)
       self.write('ok')
       return