Я занимаюсь созданием веб-приложений на Django. В основном, это SaaS сервисы для бизнеса. Во всех этих приложениях есть необходимость в асинхронных задачах. Для их реализации использую Celery. В статье расскажу о ситуациях, в которых применяю Celery, с примерами кода.
Celery – это система для управления очередями задач. Принципиально умеет 2 вещи: брать задачи из очереди и выполнять задачи по расписанию. В качестве брокера очередей обычно используются RabbitMQ или Redis. В очереди кладутся задачи, а потом воркеры Celery берут их оттуда и выполняют.
Для Celery можно придумать применение почти в любом приложении, но далее я опишу только те кейсы, в которых использую его сам.
1. Задачи по расписанию
Часто есть задачи, которые нужно выполнить в определенную дату и время: отправить пользователю напоминание, закончить пробный период аккаунта, опубликовать пост в соцсетях.
В Celery есть возможность при вызове таска указать параметр ETA – время, в которое надо запустить таск. Но если так планировать задачи, то получается очень ненадежно: они могут не запуститься и их неудобно отменять.
Более надежный способ – это использовать celerybeat schedule. То есть создать расписание, где будут таски, которые запускаются с определенной периодичностью или в определенное время. Например, если необходимо опубликовать пост в соцсетях по расписанию, то таск для этого запускается раз в минуту. Если надо закончить пробный период у аккаунта, то можно запускать таск раз в сутки.
# schedule.py
from datetime import timedelta
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'publish_post_starter': {
'task': 'publish_post_starter',
'schedule': timedelta(minutes=1),
},
'end_trial_starter': {
'task': 'end_trial_starter',
'schedule': crontab(hour=10, minute=21),
},
}
В таске стартере получаем все инстансы, у которых запланированное время уже наступило. Проходимся по инстансам и для каждого вызываем основной таск. В качестве аргументов передаем только id инстанса, чтобы не засорять очередь ненужными данными. Можем сразу пройтись по всем инстансам и выполнить действия, но чаще всего лучше вызвать отдельный таск для каждого инстанса. Так мы ускорим выполнение, и, если произойдет ошибка, то она повлияет только на один из тасков.
# tasks.py
@app.task(name='publish_post')
def publish_post(post_id):
...
@app.task(name='publish_post_starter')
def publish_post_starter():
post_ids = list(
Post.objects.filter(
publish_dt__lte=timezone.now(),
is_published=False
).values_list('id', flat=True)
)
for post_id in post_ids:
publish_post.delay(post_id)
2. Долгие вычисления и вызовы API из WSGI
Под WSGI подразумевается контекст, в котором обрабатываются запросы от пользователей (Request-Response Cycle). В противовес контексту асинхронных задач – Celery.
Для создания отзывчивого интерфейса все кнопки должны реагировать мгновенно и не должны блокировать остальной интерфейс. Для этого после нажатия кнопка блокируется, на нее ставится спиннер и отправляется ajax-запрос на сервер. Если обработка запроса занимает дольше пары секунд, то можно переместить вычисления в Celery-таск.
В WSGI вызываем таск и возвращаем ответ. На фронте разблокируем кнопку и убираем спиннер. Пользователю показываем сообщение, что действие запущено. Параллельно выполняется Celery-таск, который по завершению возвращает ответ по вебсокету. Получив результат на фронте, показываем его пользователю.
# rest_views.py
from rest_framework import status
from rest_framework.views import APIView
from rest_framework.response import Response
from tasks import send_emails
class SendEmailView(APIView):
def post(self, request):
# this id will be used to send response with websocket
request_uuid = request.data.get('request_uuid')
if not request_uuid:
return Response(status=status.HTTP_400_BAD_REQUEST)
send_emails.delay(request.user.id, request_uuid)
return Response(status=status.HTTP_200_OK)
Отдельно можно выделить вызовы внешнего API из WSGI. В данном случае все вызовы, независимо от длительности их выполнения, запускаются через Celery-таск. Это защита от дурака. Не должно быть ситуации, когда из-за недоступности какого-то внешнего API подвисает интерфейс у пользователя.
3. Вызовы из Tornado
При интеграции с соцсетью, Telegram или платежным сервисом нужен webhook-урл, на который буду приходить оповещения. Количество запросов не всегда можно рассчитать заранее, но скорее всего их количество будет превышать запросы от пользователей. Эти запросы буду приходить до того момента, как получат ответ с кодом 200.
Для обработки таких запросов подходит асинхронный фреймворк Tornado. Чтобы не превращать обработку в синхронную в Tornado не должно быть блокирующих операций. Тут и нужен Celery. Tornado handler получает запрос, валидирует данные, вызывает Celery-таск и возвращает успешный ответ.
# tornado_handlers.py
from tornado import gen, escape
from tornado.web import RequestHandler
from tasks import handle_vk_callback
class VkCallbackHandler(RequestHandler):
@gen.coroutine
def post(self, *args, **kwargs):
try:
data = escape.json_decode(self.request.body)
except ValueError:
self.set_status(status_code=400, reason='Invalid data')
return
handle_vk_callback.delay(data)
self.write('ok')
return