На этот раз мы решили рассказать о замечательном продукте, который мы используем в нашей работе. Речь пойдет о Celery — «distributed task queue». Это распределенная асинхронная очередь заданий, которая обладает широким функционалом. В нашем конструкторе сайтов нам часто приходиться запускать асинхронные с точки зрения ответа пользователю задачи. На хабре, к сожалению, не много информации по данному продукту, а он заслуживает отдельного упоминания, это мы и хотим исправить.
Итак, что же умеет Celery:
Заинтересовало? Просим под кат.
Начнем c конфигурации worker'a. Это демон, который собственно получает задания из очереди и выполняет их. Рекомендуемая очередь — RabbitMQ, но мы пока ограничились ghettoq, через MongoDB. Также поддерживается Redis и РСУБД.
celeryconfig.py:
Запуск демона: celeryd -l INFO -B
Включаем логгирование в консоль и опция -B запуск демона периодических заданий. Последний можно запустить отдельно коммандой celerybeat
Теперь создадим тестовое задание. В конфиге мы импортируем tasks, поэтому и файл заданий у нас tasks.py:
Итак, у нас 4 задания в tasks. Первые два выполняются по расписанию, т.к. они отмечены декоратором @periodic_task. А вот два последних будут вызваны непосредственно из кода программы. Вот таким образом:
Теперь для того чтобы отследить результат и факт завершения последнего задания выполним:
r.ready() # Вернет True если задание отработало
r.result # Вернет значение выполненной функции или None если еще не выполнено(асинхронно)
r.get() #Будет ждать выполнения задания и вернет ее результат(синхронно)
Переменную r можно прогнать через cPickle, положить значение в кеш и аяксом опрашивать статус задания. Либо можно получить task id, и положить в кеш его. Кроме того, task id вы можете задавать самостоятельно, главное чтоб он был уникальным.
После плотного использования celery мы обнаружили несколько ошибок, связанных с отложенным выполнением задач, с менеджером очередей ghettoq, но они все были поправлены автором в день создания issue на github, за что ему спасибо.
Не так давно вышла версия 2.0, которая перестала быть django-зависимой, а интеграция с django теперь вынесена в отдельный подпроект celery-django.
Из ограничений celery можно выделить два, точнее это просто особенности: на штатной FreeBSD worker'ы не будут работать, т.к. там нет питоновкого multiprocessing, хотя в сети есть рецепты по сборке ядра для celery; для перегрузки заданий необходимо рестартовать воркер, чтобы он загрузил новый python-код заданий и связанных функций. На linux работает замечательно.
Итак, что же умеет Celery:
- Выполнять задания асинхронно или синхронно
- Выполнять периодические задания(умная замена crond)
- Выполнять отложенные задания
- Распределенное выполнение (может быть запущен на N серверах)
- В пределах одного worker'а возможно конкурентное выполнение нескольких задач(одновременно)
- Выполнять задание повторно, если вылез exception
- Ограничивать количество заданий в единицу времени (rate limit, для задания или глобально)
- Routing заданий (какому worker'у что делать)
- Несложно мониторить выполнение заданий
- Выполнять подзадания
- Присылать отчеты об exception'ах на email
- Проверять выполнилось ли задание (удобно для построения Ajax приложений, где юзер ждет факта завершения)
Заинтересовало? Просим под кат.
Начнем c конфигурации worker'a. Это демон, который собственно получает задания из очереди и выполняет их. Рекомендуемая очередь — RabbitMQ, но мы пока ограничились ghettoq, через MongoDB. Также поддерживается Redis и РСУБД.
celeryconfig.py:
CARROT_BACKEND = "ghettoq.taproot.MongoDB"
BROKER_HOST = "xxx"
BROKER_PORT = 27017
BROKER_VHOST = "celery"
CELERY_SEND_TASK_ERROR_EMAILS = True
ADMINS = ( ('Admin', 'admin@localhost'), )
CELERYD_MAX_TASKS_PER_CHILD = 5
CELERY_IMPORTS = ("tasks", )
CELERY_DISABLE_RATE_LIMITS = True
CELERY_RESULT_BACKEND = "mongodb"
CELERY_MONGODB_BACKEND_SETTINGS = {
"host": "xxx",
"port": 27017,
"database": "celery",
"taskmeta_collection": "my_taskmeta_collection",
}
Запуск демона: celeryd -l INFO -B
Включаем логгирование в консоль и опция -B запуск демона периодических заданий. Последний можно запустить отдельно коммандой celerybeat
Теперь создадим тестовое задание. В конфиге мы импортируем tasks, поэтому и файл заданий у нас tasks.py:
from celery.decorators import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
@periodic_task(run_every=timedelta(seconds=60))
def mail_queue():
print "Task is executed every minute"
@periodic_task(run_every=crontab(hour=0, minute=10))
def transactions():
print "Task is executed every day on 0:10"
@task
def delayed_function(id):
some_function()
@task
def delayed_heavy_function(id):
some_heavy_function()
Итак, у нас 4 задания в tasks. Первые два выполняются по расписанию, т.к. они отмечены декоратором @periodic_task. А вот два последних будут вызваны непосредственно из кода программы. Вот таким образом:
from tasks import delayed_function, delayed_heavy_function
delayed_function.apply_async(args=[id], countdown=300) # Будет запущена через 300 секунд
r = delayed_heavy_function.delay(id) #Будет запущена сразу(как только появится возможность), в асинхронном режиме
Теперь для того чтобы отследить результат и факт завершения последнего задания выполним:
r.ready() # Вернет True если задание отработало
r.result # Вернет значение выполненной функции или None если еще не выполнено(асинхронно)
r.get() #Будет ждать выполнения задания и вернет ее результат(синхронно)
Переменную r можно прогнать через cPickle, положить значение в кеш и аяксом опрашивать статус задания. Либо можно получить task id, и положить в кеш его. Кроме того, task id вы можете задавать самостоятельно, главное чтоб он был уникальным.
После плотного использования celery мы обнаружили несколько ошибок, связанных с отложенным выполнением задач, с менеджером очередей ghettoq, но они все были поправлены автором в день создания issue на github, за что ему спасибо.
Не так давно вышла версия 2.0, которая перестала быть django-зависимой, а интеграция с django теперь вынесена в отдельный подпроект celery-django.
Из ограничений celery можно выделить два, точнее это просто особенности: на штатной FreeBSD worker'ы не будут работать, т.к. там нет питоновкого multiprocessing, хотя в сети есть рецепты по сборке ядра для celery; для перегрузки заданий необходимо рестартовать воркер, чтобы он загрузил новый python-код заданий и связанных функций. На linux работает замечательно.