Асинхронные задания в Django с Celery

Автор оригинала: Michael Herman
  • Перевод
Перевод статьи подготовлен в преддверии старта курса «Web-разработчик на Python».




Если в вашем приложении есть какой-то длительный процесс, вы можете обрабатывать его не в стандартном потоке запросов/ответов, а в фоновом режиме.

К примеру, в вашем приложении пользователь должен отправить картинку-миниатюру (которую, скорее всего, нужно будет отредактировать) и подтвердить адрес электронной почты. Если ваше приложение обрабатывает изображение, а потом отправляет письмо для подтверждения в обработчике запросов, то конечному пользователю придется зачем-то ждать завершения выполнения обеих задач перед тем, как перезагрузить или закрыть страницу. Вместо этого, вы можете передать эти операции в очередь задач и оставить на обработку отдельному процессу, чтобы немедленно отправить пользователю ответ. В таком случае, конечный пользователь сможет заниматься другими делами на стороне клиента во время выполнения обработки в фоновом режиме. Ваше приложение в таком случае также сможет свободно отвечать на запросы других пользователей и клиентов.

Сегодня мы поговорим о процессе настройки и конфигурирования Celery и Redis для обработки длительных процессов в приложении на Django, чтобы решать такие задачи. Также мы воспользуемся Docker и Docker Compose, чтобы связать все части вместе, и рассмотрим, как тестировать задания Celery с помощью модульных и интеграционных тестов.

К концу этого руководства мы научимся:

  • Интегрировать Celery в Django, чтобы создавать фоновые задания.
  • Упаковывать Django, Celery и Redis с помощью Docker.
  • Запускать процессы в фоновом режиме с помощью отдельного рабочего процесса.
  • Сохранять логи Celery в файл.
  • Настраивать Flower для мониторинга и администрирования заданий и воркеров (worker) Celery.
  • Тестировать задания Celery с помощью модульных и интеграционных тестов.

Фоновые задачи


Для улучшения пользовательского опыта, продолжительные процессы должны выполняться в фоновом режиме вне обычного потока HTTP-запросов/ответов.

Например:

  • Отправка писем для подтверждения;
  • Веб-скейпинг и краулинг;
  • Анализ данных;
  • Обработка изображений;
  • Генерация отчетов.

При создании приложения, старайтесь отделять задачи, которые должны выполняться в течение жизненного цикла запроса/ответа, например CRUD-операции, от задач, которые должны выполняться в фоновом режиме.

Рабочий процесс


Наша цель – разработать приложение на Django, которое для обработки продолжительных процессов вне цикла запрос/ответ использует Celery.

  1. Конечный пользователь генерирует новое задание, отправляя POST-запрос на сервер.
  2. В этом представлении задание добавляется в очередь, а id задания отправляется обратно на клиент.
  3. С помощью AJAX клиент продолжает опрашивать сервер, чтобы проверить состояние задания, в том время как само задание выполняется в фоновом режиме.



Создание проекта


Клонируйте проект из репозитория django-celery и выполните checkout по тегу v1 в ветке master:

$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master

Поскольку в общей сложности нам нужно работать с тремя процессами (Django, Redis, воркер), мы используем Docker для упрощения работы, соединив их так, чтобы мы могли все запустить одной командой в одном окне терминала.

Из корня проекта создайте образы и запустите Docker-контейнеры:

$ docker-compose up -d --build

Когда сборка завершится, перейдите на localhost:1337:



Убедитесь в том, что тесты проходят успешно:

$ docker-compose exec web python -m pytest

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0
collected 1 item

tests/test_tasks.py .                                                                         [100%]

========================================= 1 passed in 0.47s =========================================

Давайте взглянем на структуру проекта перед тем, как двигаться дальше:

├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── core
    │   ├── __init__.py
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── entrypoint.sh
    ├── manage.py
    ├── pytest.ini
    ├── requirements.txt
    ├── static
    │   ├── bulma.min.css
    │   ├── jquery-3.4.1.min.js
    │   ├── main.css
    │   └── main.js
    ├── tasks
    │   ├── __init__.py
    │   ├── apps.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── templates
    │   │   └── home.html
    │   └── views.py
    └── tests
        ├── __init__.py
        └── test_tasks.py

Запуск задания


Обработчик событий в project/static/main.js подписан на нажатие на кнопку. По клику на сервер отправляет AJAX POST-запрос с соответствующим типом задания: 1, 2 или 3.

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

На стороне сервера уже настроено представление для обработки запроса в project/tasks/views.py:

def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        return JsonResponse({"task_type": task_type}, status=202)

А теперь начинается самое интересное: привязываем Celery!

Настройка Celery


Начнем с того, что добавим Celery и Redis в файл project/requirements.txt:

celery==4.4.1
Django==3.0.4
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

Celery использует брокер сообщенийRabbitMQ, Redis или AWS Simple Queue Service (SQS) – чтобы упростить коммуникацию между воркером Celery и веб-приложением. Сообщения направляются к брокеру, а после обрабатываются воркером. После этого результаты отправляются на бэкенд.

Redis будет одновременно и брокером и бэкендом. Добавьте Redis и воркера Celery в файл docker-compose.yml следующим образом:

version: '3.7'

services:
  web:
    build: ./project
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ./project:/usr/src/app/
    ports:
      - 1337:8000
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  celery:
    build: ./project
    command: celery worker --app=core --loglevel=info
    volumes:
      - ./project:/usr/src/app
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:5-alpine

Обратите внимание на celery worker --app=core --loglevel=info:

  1. celery worker используется для запуска воркера Celery;
  2. --app=core используется для запуска core приложения Celery (которое мы коротко определим);
  3. --loglevel=info определяет уровень логирования информации.

В модуль настроек проекта добавьте следующее, чтобы Celery использовала Redis в качестве брокера и бэкенда:

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")

Затем создайте файл sample_tasks.py в project/tasks:

# project/tasks/sample_tasks.py

import time

from celery import shared_task

@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True


Здесь, с помощью декоратора shared_task мы определили новую функцию-задание Celery, которая называется create_task.

Помните о том, что само задание не будет выполняться из процесса Django, оно будет выполнено воркером Celery.

А теперь добавьте файл celery.py в "project/core":

import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

Что тут происходит?

  1. Для начала нужно установить значение по умолчанию для среды DJANGO_SETTINGS_MODULE, чтобы Celery знала, как найти проект Django.
  2. Затем мы создали экземпляр Celery с именем core и поместили в переменную app.
  3. Затем мы загрузили значения конфигурации Celery из объекта настроек из django.conf. Мы использовали namespace=«CELERY» для предотвращения коллизий с другими настройками Django. Таким образом, все настройки конфигурации для Celery должны начинаться с префикса CELERY_.
  4. Наконец, app.autodiscover_tasks() говорит Celery искать задания из приложений, определенных в settings.INSTALLED_APPS.

Измените project/core/__init__.py, чтобы приложение Celery автоматически импортировалось при запуске Django:

from .celery import app as celery_app


__all__ = ("celery_app",)

Запуск задания


Обновите представление, чтобы начать выполнение задания и отправить id:

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        task = create_task.delay(int(task_type))
        return JsonResponse({"task_id": task.id}, status=202)

Не забудьте импортировать задание:

from tasks.sample_tasks import create_task

Соберите образы и разверните новые контейнеры:

$ docker-compose up -d --build

Для запуска нового задания, выполните:

$ curl -F type=0 http://localhost:1337/tasks/

Вы увидите что-то вроде этого:

{
  "task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}

Статус задания


Вернитесь к обработчику событий на стороне клиента:

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') },
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

Когда от AJAX-запроса вернется ответ, мы будем слать getStatus() с id задания каждую секунду:

function getStatus(taskID) {
  $.ajax({
    url: `/tasks/${taskID}/`,
    method: 'GET'
  })
  .done((res) => {
    const html = `
      <tr>
        <td>${res.task_id}</td>
        <td>${res.task_status}</td>
        <td>${res.task_result}</td>
      </tr>`
    $('#tasks').prepend(html);

    const taskStatus = res.task_status;

    if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
    setTimeout(function() {
      getStatus(res.task_id);
    }, 1000);
  })
  .fail((err) => {
    console.log(err)
  });
}

Если ответ положительный, то новая строка будет добавлена к таблице DOM. Обновите представление get_status, чтобы вернуть статус:

@csrf_exempt
def get_status(request, task_id):
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return JsonResponse(result, status=200)

Импортируйте AsyncResult:

from celery.result import AsyncResult

Обновите контейнеры:

$ docker-compose up -d --build

Запустите новое задание:

$ curl -F type=1 http://localhost:1337/tasks/

Затем извлеките task_id из ответа и вызовите обновленный get_status, чтобы увидеть статус:

$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/

{
    "task_id": "25278457-0957-4b0b-b1da-2600525f812f",
    "task_status": "SUCCESS",
    "task_result": true
}

Ту же информацию вы можете посмотреть в браузере:



Логи Celery


Обновите сервис celery в docker-compose.yml так, чтобы логи Celery отправились в отдельный файл:

celery:
  build: ./project
  command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
  volumes:
    - ./project:/usr/src/app
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

Добавьте новую директорию в “project” и назовите ее “logs”. Затем добавьте в этот новый каталог положите файл celery.log.

Обновите:

$ docker-compose up -d --build

Вы должны видеть, как файл с логами локально заполняется после настройки volume:

[2020-03-25 19:42:29,586: INFO/MainProcess] Connected to redis://redis:6379/0
[2020-03-25 19:42:29,599: INFO/MainProcess] mingle: searching for neighbors
[2020-03-25 19:42:30,635: INFO/MainProcess] mingle: all alone
[2020-03-25 19:42:30,664: WARNING/MainProcess]
    /usr/local/lib/python3.8/site-packages/celery/fixups/django.py:202:
    UserWarning: Using settings.DEBUG leads to a memory
    leak, never use this setting in production environments!
    warnings.warn('''Using settings.DEBUG leads to a memory
[2020-03-25 19:42:30,667: INFO/MainProcess] celery@6d060151bfeb ready.
[2020-03-25 19:43:07,103: INFO/MainProcess]
    Received task: tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
[2020-03-25 19:43:17,099: INFO/ForkPoolWorker-2]
    Task tasks.sample_tasks.create_task[632792bb-5030-4f03-a0d8-e91979279729]
    succeeded in 10.027462100006233s: True

Панель мониторинга Flower


Flower – это легкий веб-инструмент для мониторинга Celery в режиме реального времени. Вы можете отслеживать запущенные задания, увеличивать или уменьшать пул воркеров, отображать графики и статистику, например.

Добавьте его в requirements.txt:

celery==4.4.1
Django==3.0.4
flower==0.9.3
redis==3.4.1

pytest==5.4.1
pytest-django==3.8.0

Затем добавьте новый сервис в docker-compose.yml:

dashboard:
  build: ./project
  command:  flower -A core --port=5555 --broker=redis://redis:6379/0
  ports:
    - 5555:5555
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis
    - celery

И протестируйте:

$ docker-compose up -d --build

Перейдите на localhost:5555 для просмотра панели мониторинга. Вы должны увидеть одного воркера:



Запустите еще несколько заданий, чтобы протестировать панель мониторинга:



Попробуйте добавить больше воркеров и посмотреть, как это скажется на производительности:

$ docker-compose up -d --build --scale celery=3

Тесты


Давайте начнем с самого простого теста:

def test_task():
    assert sample_tasks.create_task.run(1)
    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run(3)

Добавьте тест-кейс выше в project/tests/test_tasks.py и допишите следующий импорт:

from tasks import sample_tasks


Запустите этот тест:

$ docker-compose exec web python -m pytest -k "test_task and not test_home"

Выполнение данного теста займет около минуты:

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 2 items / 1 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

============================ 1 passed, 1 deselected in 62.43s (0:01:02) =============================

Стоит отметить, что в assert’ах выше мы использовали метод .run вместо .delay для непосредственного запуска задач, без использования воркера Celery.
Хотите использовать заглушки(mock), чтобы ускорить процесс?

@patch('tasks.sample_tasks.create_task.run')
def test_mock_task(mock_run):
    assert sample_tasks.create_task.run(1)
    sample_tasks.create_task.run.assert_called_once_with(1)

    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run.call_count == 2

    assert sample_tasks.create_task.run(3)
    assert sample_tasks.create_task.run.call_count == 3

Импортируйте:

from unittest.mock import patch, call

Протестируйте:

$ docker-compose exec web python -m pytest -k "test_mock_task"

======================================== test session starts ========================================
platform linux -- Python 3.8.2, pytest-5.4.1, py-1.8.1, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, inifile: pytest.ini
plugins: django-3.8.0, celery-4.4.1
collected 3 items / 2 deselected / 1 selected

tests/test_tasks.py .                                                                         [100%]

================================== 1 passed, 2 deselected in 1.13s ==================================

Видите? Теперь гораздо быстрее!

Как насчет полноценного интеграционного тестирования?

def test_task_status(client):
    response = client.post(reverse("run_task"), {"type": 0})
    content = json.loads(response.content)
    task_id = content["task_id"]
    assert response.status_code == 202
    assert task_id

    response = client.get(reverse("get_status", args=[task_id]))
    content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": None}
    assert response.status_code == 200

    while content["task_status"] == "PENDING":
        response = client.get(reverse("get_status", args=[task_id]))
        content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}

Помните, что этот тест использует того же брокера и бэкенд, что и в разработке. Вы можете создать новый экземпляр приложения Celery для тестирования:

app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)

Добавьте импорт:

import json

И убедитесь в том, что тесты прошли успешно.

Заключение


Сегодня мы познакомились с базовой настройкой Celery для выполнения долгосрочных заданий в приложении на Django. Вы должны отправлять в очередь обработки любые процессы, которые могут замедлить работу кода на стороне пользователя.

Также Celery можно использовать для выполнения повторяющихся задач и декомпозиции сложных ресурсоемких задач, чтобы распределить вычислительную нагрузку на несколько машин и уменьшить время выполнения и нагрузку на машину, которая обрабатывает запросы клиента.

Весь код вы можете найти в этом репозитории.



Успеть на курс
OTUS. Онлайн-образование
Цифровые навыки от ведущих экспертов

Комментарии 4

    +2
    Спасибо за перевод хорошей статьи.
    Пара замечаний, дополнений. Думаю стоит сделать пометку что данный проект не предназначен для продакшена. Хотя бы потому что используется dev сервер django. Не знаю зачем автор прописал переменную окружения с secret key прямо в docker-compose.yml, возможно это часть его dev конфига — лучшим местом для этого будет файл .env, который будет прописан в .gitignore.
    По самому celery. Уже много раз обсуждалось и везде предупреждают, но повторю — не используйте в качестве аргументов для тасков сложные объекты, например модели django. Передавайте лучше id и уже в таске получайте объект из БД. Ещё один важный момент, который может смутить начинающего разработчика на django — вьюхи, как правило, выполняются в транзакции. Это может привести к тому, сохранив новый объект и сразу отправь его id в таск вы можете получить object not found. Чтобы такого избежать, нужно использовать конструкцию типа
    transaction.on_commit(lambda: some_celery_task.delay(obj.id))
    

    docs
      0
      Так же можно смотреть текущие задачи и их статусы с помощью
      $ celery -A worker events

      Очень полезно когда нужно посмотреть задачи прямо сейчас.
        0
        гайд на 10/10
          0

          Интересный подход, но необычно видеть Celery и Docker вместе.
          Возможно я не верно истолковал посыл статьи, но разве не проще было бы использовать много контейнеров с приложениям (consumer/publisher) без Celery? Docker в этом случае мог бы обеспечить перенаправление логов из stdout, ограничения по ресурсам (CPU, RAM, I/O) и масштабирование путем увеличения ядер/квоты CPU, или увеличением количества контейнеров…


          Для production на работе пришли к контейнерам с aio-pika поверх облака с контейнерами.

          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

          Самое читаемое