Когда проектируешь масштабируемые системы, где приходится обращаться ко множеству внешних компонентов, например, использование стороннего API, отправка почты или конвертация видео, лучшим способом реализации является асинхронная модель с системой очередей, которая является связующим звеном для взаимодействия всех компонентов системы.
Самой популярной системой очередей в Python является Celery, она обладает широким набором возможностей по управлению задачами. К сожалению, системы на базе Celery сложно поддерживать в работоспособном состоянии, и когда что-то идёт не так, то найти проблему бывает весьма не просто. Можете спросить любого девопса об опыте работы с Celery, но будьте готовы услышать не очень приятные слова.
К счастью, есть альтернативное решение — uWSGI Spooler, и в этой статье я расскажу о нём подробнее.
Основным отличием от Celery является то, что не нужно использовать дополнительные компоненты (сам Celery и хранилище, например Redis), таким образом количество точек отказа уменьшается на две. В качестве хранилища задач может использоваться директория, внешняя директория или сетевой пул.
Для управления Python программами мы часто используем uWSGI. Почему? Потому что он прост в настройке, надежен, гибок и выполняет большинство требований.
Кроме обслуживания Python-кода в виде обеспечения непрерывного доступа к web-приложению, в uWSGI входит компонент Spooler, который реализует систему очередей. Spooler имеет некоторые особенности, а документация по нему достаточно скудна.
Использовать uWSGI Spooler просто, как раз-два-три! Но есть несколько нюансов.
Модуль uwsgi нельзя импортировать из кода, а соответственно тестировать из консоли код не получится, необходимо каждый раз запускать uwsgi воркера, для чего необходимо создать конфиг:
[uwsgi]
socket = /var/run/mysite.sock
master = True
processes = 4
project_dir = /home/myuser/mysite
chdir = %(project_dir)
spooler = /var/uwsgi_spools/mysite_spool
spooler-import = path.to.spool.package # (package to import spool file)
spooler-frequency = 10 # Frequency for scanning spool
max-requests = 5000
module = wsgi:application
touch-reload = wsgi.py
Файл воркера:
from uwsgidecorators import spool, uwsgi
@spool
def my_func(args):
print(args)
# do some job
Постановка задачи из вашего кода:
import uwsgi_spools.mysite_spool as mysite_spool
mysite_spool.my_func.spool(test=True)
Как можно убедиться из примера, порог вхождения для использования очень низкий.
Внутри таска доступен один аргумент, который содержит словарь с тремя служебными ключами (имя функции ud_spool_func, имя таска spooler_task_name, статус таска ud_spool_ret) и всеми параметрами, что были переданы при создании таска, в примере это ключ test.
Таск может вернуть три статуса:
- -2 (SPOOL_OK) – таск выполнен, будет удалён из очереди;
- -1 (SPOOL_RETRY) – что-то пошло не так, таск будет повторно вызван;
- 0 (SPOOL_IGNORE) – игнорировать таск.
Все прочие значения будут интерпретированы как -1 (SPOOL_RETRY).
Особенность: декоратор @spool
выполняется единожды (возвращает SPOOL_OK), если функция не упала с исключением.
Для того чтобы управлять жизненным циклом нужно использовать @spoolraw
.
Особые ключи (вспомогательные) при создании таска:
- spooler — абсолютный путь к спулеру, который будет выполнять задачу;
- at — unix time, когда задача должна быть выполнена (правильнее сказать, она не будет выполнена ранее этого значения);
- priority — указывает на подпапку в папке задач (на такую подпапку можно выделить большее количество воркеров), через --spooler-ordered можно настроить приоритеты;
- body — этот ключ используется для значений более 64 КБ, в задачу будет доступен в сериализованном виде.
Кроме декоратора @spool
доступен декоратор @timer
, который принимает количество секунд в качестве аргумента и позволяет выполнять декорируемую функцию с указанным интервалом.
@timer(30)
def my_func(args):
print(args)
# do some job every 30 sec
Аналогично @timer
есть декоратор @spoolforever
, который будет повторно запускать выполнение функции (завершение задачи со статусом SPOOL_RETRY).
@spoolforever
def my_func(args):
print(args)
# do some job and repeat
Для настройки воркеров для работы по сети, нужно добавить адрес, по которому он будет доступен в ini-файл:
socket = 127.0.0.1:10001
При создании задачи указывать адрес получателя задачи:
uwsgi.send_message(“127.0.0.1:10001”, 17, 0, test=True, 5)
# или
uwsgi.spool(test=True, spooler=“127.0.0.1:10001”)
Таким образом, uWSGI Spooler можно использовать как замену очередям, но если всё же не хватает возможностей или хочется немного сахара, то можно использовать uwsgi-tasks, который реализует недостающее.