Сегодня мы с вами сделаем web-интерфейс для управления запуском ETL-процесса. В прошлой статье мы написали консольный скрипт, который разово запускает выгрузку. Но как это передать заказчику ?!
Что нам понадобится
виртуальное окружение с установленными пакетами django, redis, django_celery_beat, django-celery-results. Подробнее о требуемых зависимостях тут.
запущенный redis-server
Примечание: вместо redis-server можно использовать другой брокер сообщений - rabbitmq. В этом случае вам будет нужно указать другой URL брокера в настройках, указанных ниже.
Старый-добрый джанго
Итак, поскольку речь идёт о python и нам нужен web-интерфейс, мы поступим просто и инициализируем джанго проект с приложением в нём:
django-admin startproject config . # проект создается в текущей папке, имя конфигурационной папки config django-admin startapp etl_app # приложение создаем для размещения в нем модуля с бизнес-логикой
Джанго мы выбрали из-за того, что в нём есть готовый административный интерфейс.
Согласно мануалам django_celery_beat, django-celery-results добавляем в settings.py нашего проекта новые приложения и некоторые настройки.
config/settings.py:
INSTALLED_APPS = [ ... 'etl_app', # наше приложение 'django_celery_beat', # приложение из пакета django_celery_beat 'django_celery_results', # приложение из пакета django-celery-results ]"
В данном проекте "из песочницы" у нас одна очередь, однако можно настроить несколько очередей, например для распределения задач по приоритетам.
Настраиваем celery
Добавляем в папку настроек джанго проекта модуль celery.py и делаем доступным экземпляр приложения celery_app.
config/celery.py:
import os from celery import Celery os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings") # указываем Celery где найти Django-проект app = Celery("etl_project") # создаем экземпляр Celery app.config_from_object("django.conf:settings", namespace="CELERY") # настройки конфигурации для Celery в settings.py будут начинаться с префикса CELERY_ app.autodiscover_tasks() # Celery будет искать задания в приложениях settings.INSTALLED_APPS
config/__init__.py:
""" чтобы экземпляр приложения Celery автоматически импортировался при запуске Django """ from .celery import app as celery_app __all__ = ('celery_app',)
config/settings.py:
... # CELERY CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0' # для rabbitmq, поменяйте адрес брокера на amqp://guest:guest@127.0.0.1:5672 CELERY_TASK_TRACK_STARTED = True # запускает трекинг задач Celery # Планировщик задач CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # Celery настроен на использование планировщика из базы данных CELERY_BROKER_TRANSPORT_OPTION = {'visibility_timeout': 3600} # время ожидания видимости 1 час CELERY_RESULT_BACKEND = 'django-db' # указание для django_celery_results куда записывать результат выполнения задач CELERY_ACCEPT_CONTENT = ['application/json'] # это тип содержимого, разрешенный к получению CELERY_TASK_SERIALIZER = 'json' # это строка, используемая для определения метода сериализации по умолчанию CELERY_RESULT_SERIALIZER = 'json' # является типом формата сериализации результатов CELERY_TASK_DEFAULT_QUEUE = 'default' # celery будет использовать это имя очереди
Регистрируем нашу etl-функцию
В папке приложения etl_app создаем модуль tasks.py и импортируем в него код etl-процесса.
etl_app/tasks.py:
from celery import shared_task from etl_app import etl @shared_task(name="Задача ETL") # регистрируем функцию в воркере def etl_task(*args, **kwargs): unloads = etl.load() multiplication = etl.transform(unloads) etl.extract(multiplication) return "my result data" # здесь может быть более полезная информация
Единственная задача декоратора @shared_task - зарегистрировать нашу функцию в воркере и сделать её доступной для запуска из очереди. Подробнее в документации.
Запускаем проект
Открываем несколько терминалов:
(опционально)
redis-server, если он у вас не запущен ранееcelery -A config worker -l info # запускаем воркер для наших задач, все логи бизнес-кода будут здесьcelery -A config beat -l info # запускаем службу beat в качестве отдельного процессаpython manage.py runserver # запускаем сервер в тестовом режиме
В терминале воркера вы должны увидеть нашу задачу в перечне доступных:
- ** ---------- [config] - ** ---------- .> app: etl_project:0x7ff955e38490 - ** ---------- .> transport: redis://127.0.0.1:6379/0 - ** ---------- .> results: - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> default exchange=default(direct) key=default [tasks] . Задача ETL
В терминале beat'а должно быть сообщение о том, что управление расписанием задач доступно:
LocalTime -> 2023-01-22 05:02:23 Configuration -> . broker -> redis://127.0.0.1:6379/0 . loader -> celery.loaders.app.AppLoader . scheduler -> django_celery_beat.schedulers.DatabaseScheduler . logfile -> [stderr]@%INFO . maxinterval -> 5.00 seconds (5s) [2023-01-22 05:02:23,184: INFO/MainProcess] beat: Starting...
Создаем периодическую задачу
Для начала создаем экземпляр модели Intervals. На скрине ниже мы создаем период "раз в 10 минут":

Приложение django_celery_beat предлагает большое множество настройки расписания, включая классический cron. Оставляю вам это на самостоятельное изучение по документации.
Затем создаем расписание для нашей задачи в модели Periodic tasks. Если до этого вы всё сделали правильно, то в выпадающем списке задач вы увидите нашу "Задача ETL".

А вот обещанная возможность запустить задачу вне плана (см. Action):

Смотрим результат выполнения
Задача запускается в воркере. Пример вывода:
[2023-01-23 11:57:10,919: INFO/MainProcess] Task Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf] received [2023-01-23 11:57:11,042: INFO/ForkPoolWorker-2] Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf]: the square of an even number: 0 [2023-01-23 11:57:11,043: INFO/ForkPoolWorker-2] Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf]: 1 [2023-01-23 11:57:11,045: INFO/ForkPoolWorker-2] Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf]: the square of an even number: 4 [2023-01-23 11:57:11,045: INFO/ForkPoolWorker-2] Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf]: skip load stage [2023-01-23 11:57:11,045: INFO/ForkPoolWorker-2] Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf]: the square of an even number: 16 [2023-01-23 11:57:11,060: INFO/ForkPoolWorker-2] Task Задача ETL[a1bd0114-c3a9-4519-b1a4-af06614022cf] succeeded in 0.13728031599998758s: 'my result data'
В части вывода в консоль код из предыдущей статьи претерпел изменения. print'ы заменены на логгер. Причем я использую логгер из пакета celery
from celery.utils.log import get_task_logger, который выводит мне айдишник задачи. Хотя можно остановится и на джанговском логгере, у которого есть отличная документация.
В модели Task results есть возможность посмотреть статус выполнения таска. Обратите внимание на поля Task State и Result Data. Значение последнего поля берется из return нашего таска (функция, обернутая в @shared_task):

Заключение
Сегодня мы научились на примере etl-процесса создавать, запускать и контролировать выполнение регулярных задач. Использовали джанго по назначению - создали web-интерфейс в "обозначенные" сроки. Результат можно передавать пользователю. Помните о том, что вашему пользователю нужно назначить права в админке на работу с моделями из приложений в django_celery_beat и django_celery_results.
Благодарю за внимание. С удовольствием отвечу на ваши вопросы. Репозиторий кода доступен по ссылке.
