Celery + asyncio

    Привет, Хабр! Я хочу рассказать, как я решал проблему эффективного конкурентного исполнения asyncio задач в Celery.

    КДПВ

    Введение


    Celery — большой проект со сложной историей и тяжким бременем обратной совместимости. Ключевые архитектурные решения принимались задолго до появления asyncio в python. Поэтому тем более интересно, что запустить как-нибудь asyncio-задачу можно в celery из коробки.

    Зануда mode on
    Формально после #839. Субъективно для меня переименование пакета не меняет архитектуру приложения. Зануда mode off.

    Запускаем asyncio задачи в ванильном celery


    Запустить asyncio задачу можно из коробки:

    import asyncio
    
    from .celeryapp import celeryapp
    
    
    async def async_task():
        await asyncio.sleep(42)
    
    
    @celeryapp.task
    def regular_task():
        coro = async_task()
        asyncio.run(coro)
    

    Очевидные плюсы:

    • Работает же!
    • Просто
    • Нет дополнительных внешних зависимостей

    Что, собственно, не так?

    • Event Loop создаётся внутри каждого воркера
    • Переключения контекста между исполняющимися корутинами не происходит
    • В один момент времени воркер исполняет не более одной корутины
    • Общие ресурсы не шарятся между задачами
    • Бойлерплейт

    То есть asyncio в данном случае используется ради asyncio, а преимуществ никаких

    Попробуем хитрее?


    Я боролся за производительность:

    import asyncio
    import threading
    
    from .celeryapp import celeryapp
    
    celeryapp.loop = asyncio.get_event_loop()
    celeryapp.loop_runner = threading.Thread(
        target=celeryapp.loop.run_forever,
        daemon=True,
    )
    celeryapp.loop_runner.start()
    
    async def async_task():
        await asyncio.sleep(42)
    
    
    @celeryapp.task
    def regular_task():
        coro = async_task()
        asyncio.run_coroutine_threadsafe(
            coro=coro,
            loop=celeryapp.loop,
        )
    

    Бинго! Плюсы:

    • Всё ещё работает
    • Даже более-менее эффективно
    • Даже ресурсы шарятся между корутинами в пределах воркера
    • Всё ещё нет дополнительных внешних зависимостей

    Бинго? Проблемы не заставили себя ждать:

    • Celery не знает ничего про запущенную корутину
    • Вы теряете контроль над исполнением таски
    • Вы теряете контроль над исключениями
    • Бойлерплейт

    Идти с таким чудесным инженерным решением в прод у меня как-то рука не поднялась

    Постановка задачи


    • Должно работать
    • Корутины должны исполняться конкурентно
    • Ресурсы должны шариться между множеством исполняемых тасок
    • Никакого бойлерплейта
    • Простой предсказуемый API

    То есть, мои ожидания:

    import asyncio
    
    from .celeryapp import celeryapp
    
    
    @celeryapp.task
    async def async_task():
        await asyncio.sleep(42)
    

    Итоги


    Свои идеи я реализовал в библиотеке celery-pool-asyncio. Эта библиотека используется нашей командой в текущем проекте, и мы уже выкатились в прод.

    Коротко о возможностях


    Помимо непосредственно исполнения asyncio задач, celery-pool-asyncio также решает проблемы:

    • Шедулинг асинхронных задач
    • Поддержка корутин в сигналах celery

    Для того, чтобы заставить celery-pool-asyncio работать, я использовал monkey patching. Для каждого применяемого в рантайме патча предусмотрена возможность отключения.

    Обо всём этом подробнее можно прочитать в документации

    Планы


    По-хорошему, нужно интегрировать пул в celery. С другой стороны, разработчики celery во всю прототипируют celery 5.0, который будет асинхронным. Стоит ли игра свеч?

    Примеры


    Для демонстрации возможностей моих библиотек celery-pool-asyncio и celery-decorator-taskcls (статья) был реализован тестовый проект.

    Прочее


    Я пытался то же самое рассказать на митапе

    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 2

      +1
      Статью я не читал, ведь зачем в celery нужно конкурентное исполнение.
        0

        Конечно не нужна. Поэтому там из коробки пулы prefork, eventlet и gevent, и внешним пакетом был реализован пул treadpoolexecutor.

      Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

      Самое читаемое