
Я занимаюсь созданием веб-приложений на Django. В основном, это SaaS сервисы для бизнеса. Во всех этих приложениях есть необходимость в асинхронных задачах. Для их реализации использую Celery. В статье расскажу о ситуациях, в которых применяю Celery, с примерами кода.
Celery – это система для управления очередями задач. Принципиально умеет 2 вещи: брать задачи из очереди и выполнять задачи по расписанию. В качестве брокера очередей обычно используются RabbitMQ или Redis. В очереди кладутся задачи, а потом воркеры Celery берут их оттуда и выполняют.
Для Celery можно придумать применение почти в любом приложении, но далее я опишу только те кейсы, в которых использую его сам.
1. Задачи по расписанию
Часто есть задачи, которые нужно выполнить в определенную дату и время: отправить пользователю напоминание, закончить пробный период аккаунта, опубликовать пост в соцсетях.
В Celery есть возможность при вызове таска указать параметр ETA – время, в которое надо запустить таск. Но если так планировать задачи, то получается очень ненадежно: они могут не запуститься и их неудобно отменять.
Более надежный способ – это использовать celerybeat schedule. То есть создать расписание, где будут таски, которые запускаются с определенной периодичностью или в определенное время. Например, если необходимо опубликовать пост в соцсетях по расписанию, то таск для этого запускается раз в минуту. Если надо закончить пробный период у аккаунта, то можно запускать таск раз в сутки.
# schedule.py
from datetime import timedelta
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'publish_post_starter': {
'task': 'publish_post_starter',
'schedule': timedelta(minutes=1),
},
'end_trial_starter': {
'task': 'end_trial_starter',
'schedule': crontab(hour=10, minute=21),
},
}
В таске стартере получаем все инстансы, у которых запланированное время уже наступило. Проходимся по инстансам и для каждого вызываем основной таск. В качестве аргументов передаем только id инстанса, чтобы не засорять очередь ненужными данными. Можем сразу пройтись по всем инстансам и выполнить действия, но чаще всего лучше вызвать отдельный таск для каждого инстанса. Так мы ускорим выполнение, и, если произойдет ошибка, то она повлияет только на один из тасков.
# tasks.py
@app.task(name='publish_post')
def publish_post(post_id):
...
@app.task(name='publish_post_starter')
def publish_post_starter():
post_ids = list(
Post.objects.filter(
publish_dt__lte=timezone.now(),
is_published=False
).values_list('id', flat=True)
)
for post_id in post_ids:
publish_post.delay(post_id)
2. Долгие вычисления и вызовы API из WSGI
Под WSGI подразумевается контекст, в котором обрабатываются запросы от пользователей (Request-Response Cycle). В противовес контексту асинхронных задач – Celery.
Для создания отзывчивого интерфейса все кнопки должны реагировать мгновенно и не должны блокировать остальной интерфейс. Для этого после нажатия кнопка блокируется, на нее ставится спиннер и отправляется ajax-запрос на сервер. Если обработка запроса занимает дольше пары секунд, то можно переместить вычисления в Celery-таск.
В WSGI вызываем таск и возвращаем ответ. На фронте разблокируем кнопку и убираем спиннер. Пользователю показываем сообщение, что действие запущено. Параллельно выполняется Celery-таск, который по завершению возвращает ответ по вебсокету. Получив результат на фронте, показываем его пользователю.
# rest_views.py
from rest_framework import status
from rest_framework.views import APIView
from rest_framework.response import Response
from tasks import send_emails
class SendEmailView(APIView):
def post(self, request):
# this id will be used to send response with websocket
request_uuid = request.data.get('request_uuid')
if not request_uuid:
return Response(status=status.HTTP_400_BAD_REQUEST)
send_emails.delay(request.user.id, request_uuid)
return Response(status=status.HTTP_200_OK)
Отдельно можно выделить вызовы внешнего API из WSGI. В данном случае все вызовы, независимо от длительности их выполнения, запускаются через Celery-таск. Это защита от дурака. Не должно быть ситуации, когда из-за недоступности какого-то внешнего API подвисает интерфейс у пользователя.
3. Вызовы из Tornado
При интеграции с соцсетью, Telegram или платежным сервисом нужен webhook-урл, на который буду приходить оповещения. Количество запро��ов не всегда можно рассчитать заранее, но скорее всего их количество будет превышать запросы от пользователей. Эти запросы буду приходить до того момента, как получат ответ с кодом 200.
Для обработки таких запросов подходит асинхронный фреймворк Tornado. Чтобы не превращать обработку в синхронную в Tornado не должно быть блокирующих операций. Тут и нужен Celery. Tornado handler получает запрос, валидирует данные, вызывает Celery-таск и возвращает успешный ответ.
# tornado_handlers.py
from tornado import gen, escape
from tornado.web import RequestHandler
from tasks import handle_vk_callback
class VkCallbackHandler(RequestHandler):
@gen.coroutine
def post(self, *args, **kwargs):
try:
data = escape.json_decode(self.request.body)
except ValueError:
self.set_status(status_code=400, reason='Invalid data')
return
handle_vk_callback.delay(data)
self.write('ok')
return
