Celery taskcls: новый декоратор, новые возможности

Привет, Хабр! Я расскажу тебе историю своего профессионального подгорания.


Так вышло, что я терпеть не могу рутинных однообразных действий. У меня за плечами несколько проектов, использующих Celery. Каждый раз, когда задача становится сложнее вывода 2 + 2 = 5, шаблон решения сводится к созданию класса, выполняющего задачу, и функции-стартера, с которой умеет работать Celery — бойлерплейта. В этой статье я расскажу, как я боролся с бойлерплейтом, и что из этого вышло.


Logo


Отправная точка


Рассмотрим рядовую таску Celery. Есть класс, исполняющий задачу, и функция-стартер, выполняющая инстанцирование класса и запуск одного его метода, в котором реализована вся логика задачи и унаследована обработка ошибок:


class MyTask(
    FirstMixin,
    SecondMixin,
    ThirdMixin,
):
    def main(self):
        data = self.do_something()
        response = self.remote_call(data)
        parsed = self.parser(response)
        return self.process(parsed)

@app.task(bind=True)
def my_task(self, arg1, arg2):
    instance = MyTask(
        celery_task=self,
        arg1=arg1,
        arg2=arg2,
    )
    return instance.full_task()

При этом метод full_task включает в себя вызов main, однако также занимается обработкой ошибок, логгированием и прочей чушью, не имеющей прямого отношения к основной задаче.


Идея тасккласса


В корне тасккласса лежит простая идея: в базовом классе можно определить метод класса task, в нём реализовать поведение функции-стартера, а после наследоваться:


class BaseTask:
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)

    def full_task(self):
        try:
            return self.main()
        except:
            self.celery_task.retry(countdown=30)

    @classmethod
    def task(cls, task, **kwargs):
        self = cls(
            celery_task=celery_task,
            **kwargs,
        )
        return self.full_task()

Вся вспомогательная скукотища собрана в базовом классе. Больше к ней не возвращаемся. Реализуем логику задачи:


@app.taskcls(bind=True)
class MyTask(
    BaseTask,
    FirstMixin,
    SecondMixin,
    ThirdMixin,
):
    def main(self):
        data = self.do_something()
        response = self.remote_call(data)
        parsed = self.parser(response)
        return self.process(parsed)

Больше никакой шелухи, уже намного лучше. Однако что же с точкой входа?


MyTask.task.delay(...)

MyTask.task обладает всеми методами обычной таски: delay, apply_async, и, вообще говоря, ей и является.


Теперь аргументы декоратора. Особенно весело тащить bind = True в каждую таску. Можно ли передать аргументы по умолчанию через базовый класс?


class BaseTask:
    class MetaTask:
        bind = True

    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)

    def full_task(self):
        try:
            return self.main()
        except:
            self.celery_task.retry(countdown=30)

    @classmethod
    def task(cls, task, **kwargs):
        self = cls(
            celery_task=celery_task,
            **kwargs,
        )
        return self.full_task()

Вложенный класс MetaTask содержит аргументы по умолчанию и будет доступен всем дочерним классам. Интересно, что и его можно унаследовать:


class BaseHasTimeout(BaseTask):
    class MetaTask(BaseTask.MetaTask):
        timeout = 42

Наивысшим приоритетом обладают аргументы, переданные декоратору @app.taskcls:


@app.taskcls(timeout=20)
class MyTask(
    BaseHasTimeout,
    FirstMixin,
    SecondMixin,
    ThirdMixin,
):
    def main(self):
        ...

В итоге timeout для задачи будет 20.


Выход за рамки


В web-приложениях часто есть необходимость из view запустить таску. В случае высокой сцепленности view и таски их можно совместить:


class BaseViewTask:
    @classmethod
    def task(cls, **kwargs):
        # Somehow init View class manually
        self = cls(...)
        return self.celery()

@app.taskcls
class MyView(
    BaseViewTask,
    FirstMixin,
    SecondMixin,
    ThirdMixin,
    APIView,
):
    queryset = MyModel.objects.all()

    def get_some_data(self, *args, **kwargs):  # common methed
        return self.queryset.filtert(...)

    def get(self, request):
        data = self.get_some_data(request.field)  # used in request handling
        return Response(json.dumps(data))

    def post(self, request):
        self.task.delay(...)
        return Response(status=201)

    def celery(self):
        data = self.get_some_data(...)  # also used in background task
        return self.do_something(data)

Кстати, именно для исключения коллизии имён вложенный класс называется MetaTask, а не Meta, как в django.


Заключение


Эта функциональность ожидается в Celery 4.5. Однако я также подготовил пакет, позволяющий попрбовать декоратор taskcls уже сегодня. Идея пакета сводится к тому, что при обновлении Celery до версии 4.5 вы сможете убрать его импорт не меняя более ни строчки кода.

Similar posts

Ads
AdBlock has stolen the banner, but banners are not teeth — they will be back

More

Comments 22

    +3
    Я для себя ранее открыл пару аналогичных либ, в некоторых — данная проблема обработки рутинных задач решена:
    Dramatiq
    Huey

    Наследование реализую с помощью механизма pipeline и functools.partial. Увы Celery — не серебряная пуля ;)
    P.S.: В данный момент — все чаще использую pika или aiopika.
      0
      Большое спасибо, в ближайшее время ознакомлюсь
        0
        Ознакомился. В проекте взаимодействуют 2 разных инстанса Celery, каждый со своим изолированным набором тасок. Прямо вот в этом месте dramatiq и huey с их глобальным app становятся непригодными

        Спасибо.
        +2
        Странно, что мейнтейнеры согласились включить это в репозиторий, уж так ли это необходимо?

        Вообще celery переживает сложный период с выходом 4.x и качество «продукта» упало.
        В celery очень много наростов, которые комьюнити видимо не может поддерживать и они просто вырезаются, из наиболее интересных это прекращение поддержки Redis как брокера.
        Внутренности celery и его обвязок тоже не сахар (видел, исправлял, пытался доработать) и как это всё стабилизируется непонятно.
        Часто обвязки несовместимы друг с другом, в некоторых версиях есть ошибки, а версии без ошибок уже дают ошибки из-за несовместимости.

        Слишком сложный продукт стал для простого запуска задач (построение workflow из задач полноценно не работает в celery, дальше chain и group лучше не ходить).

        Работаем с celery c 2015 года и через пару недель надеюсь откажемся от него, хотя задач у нас всего 3000-5000 в час.

        Версия 3.x была вполне стабильна.
          0
          > Странно, что мейнтейнеры согласились включить это в репозиторий

          Да вот до сегодняшнего дня там просто висела лычка «Milestone 4.5». Сегодня пол дня рассказывал, что это и зачем это. Пока туго, придётся тестовый проект писать

          > уж так ли это необходимо?

          У меня есть аналогия — ООП. Дело в том, что без ООП можно реализовать что угодно — сишные структурки в помощь. Однако почему-то ООП является наиболее популярной парадигмой, хотя, казалось бы, так ли это необходимо?

          > из наиболее интересных это прекращение поддержки Redis как брокера

          Странно, документация говорит, что redis поддерживается в качестве брокера. Можно подробнее?

          > Внутренности celery и его обвязок тоже не сахар (видел, исправлял, пытался доработать) и как это всё стабилизируется непонятно.

          Да, хорошо видно, что celery писался разными людьми. Одного посещала муза, другого нет
          А что вы вкладываете в слово `обвязки`? Модули? Или уровни абстракции?

          А вообще celery нужен хороший рефакторинг. Каким образом взаимодействуют пул и хаб я пока не понял, поэтому мой asyncio-пул — это пока скорее костыль, нежели решение.
            +1
            ООП дело вкуса, просто всего один метод в классе…
            Про redis погорячился, это в 2016 году они хотели его почикать, но оставили github.com/celery/celery/commit/e15b0bfc658b397955beeae4a84127cf44686d50

            Обвязки это billiard, kombu, beat и т.д.
              –1
              О, интересная ссылочка

              > ООП дело вкуса, просто всего один метод в классе…

              Так в том и дело, что не один. Я упираю на множественное наследование. Один (опять же, это слабодостижимый идеал. Чаще несколько) метод требуется до или переопределить и скомбинировать остальные.

              Ага, то есть и и модули, и абстракции. Так, а какие претензии к kombu? Свести под одну гребёнку разные шины сообщений — это же прекрасно. А чем billiard провинился? Получилась хорошая абстракция и по факту основа для построения кластера. Beat же вообще опциональный. Для одного воркера можно передать флаг -B — и beat будет встроен в воркер. Естественно, для нескольких экземпляров воркеров нельзя встраивать beat, иначе копии тасок станут исполняться на каждой ноде. Во весело запустить одновременно несколько сессий бэкапа БД.

              А вообще к beat у меня персональный пунктик. Вот есть же kombu под капотом. Но нет — расписания мы способны подхватить лишь при старте. Выкручивался пакетом redisbeat, благо, брокером использовался как раз redis. Там кстати смешной баг был. Ну как смешной, стул тогда подо мной прогорел
                +1
                Естественно, для нескольких экземпляров воркеров нельзя встраивать beat, иначе копии тасок станут исполняться на каждой ноде.

                celery-redbeat решает проблему. Просто запускаем все celery с -B и не беспокоимся.


                запустить одновременно несколько сессий бэкапа БД

                Ну подобные таски в лок оборачивать положено, независимо ни от чего, например с помощью python-redis-lock

                  0
                  celery-redbeat решает проблему

                  Неа, эту проблему решает просто внешний шедулер, и стандартного хватит. Когда появляется слово «динамика», стандартный шедулер уходит в закат

                  А за библиотечки спасибо — celery-redbeat выглядит поприятнее, чем redisbeat. python-redis-lock интересен тем, что он идёт в поставке с celery-redbeat
                    0
                    Сколько неприятных моментов нам доставили распределенные блокировки по TTL в случае нештатных ситуаций :)
                    Самое печально, что блокировки по TTL без отслеживания смерти клиента в случае аварии могут помешать системе восстановиться после простого ребута.
                      0

                      python-redis-lock может ставить лок с TTL и обновлять его, пока процесс работает. Умер/подвис процесс — лок устарел.


                      with redis_lock.Lock('my-lock', expire=60, auto_renewal=True):
                          # Do work....
                        0
                        Это всё до боли знакомо и ясно :)
                        Но лок устареет только через 60 секунд.
                        А если у нас длительные процессы с несколько непонятной продолжительностью которые ресурсы берут? Аппроксимация TTL? Обновление TTL? Ну его… не надёжно это всё, всёравно останется какая-то блокировка у которой TTL несколько часов.
                        Мы своё элегантное решение сделали, пока обкатываем и оформляем, если интересно то пишите asovetnikov на гмейле
                    0
                    По отдельности kombu, billiard и всё остальное прекрасны может быть, но этот зоопарк сильно связан по конкретным версиям между собой и в случае ошибки в одном начинается процесс подбора версий.
                      0

                      Смешно было, когда проект с celery 3.x решили под Python 3.7 запустить. В одной из не самых свежих либ была строка вида import foo.async.bar— получаем SyntaxError :)

                        0
                        Смеяться будете, мы запустили celery 4.2 под Python 3.7 и нам пришлось переименовывать async в asynchronous.
                          0

                          Это в kombu 4.2.0 пофикшено было полтора года назад. Сама селери 4.3.0 стабильная сейчас. Собственно, в вышеупомянутом проект бампнули селери до 4.3.0 и никаких проблем.

                            0
                            Вы будете смеяться дальше, но нам пока некогда и нет смысла переходить дальше Django 1.8, а celery 4.3 не дружит если у тебя ниже 1.9
                            Потрясающая зависимость celery и Django :)
                            Заниматься этим всем и ждать новых сюрпризов смысла не вижу, есть положительный опыт с другой библиотекой, проще перейти.
                              0
                              Уж до кучи, забыл совсем — flower крайне негативные эмоции вызывает, история задач вечно куда-то теряется, фильтровать нормально нельзя, какие-то пустые поля в задачах часто бывают, чувство потери контроля в сравнении со старым celery cam и djcelery
                +1
                А почему вы не воспользовались тем, что «таска» в Celery — это на самом деле и есть класс, который налету создаётся декоратором app.task() с использованием базового класса celery.Task? С давних пор можно было самому писать «таски» в виде классов унаследованных от celery.Task.
                Минус был только в том, что их не так удобно регистрировать в Celery-приложении, как это делается с декоратором. И ещё в минус можно записать невозможность переопределить базовый класс через настройку Celery.
                  0
                  Хороший вопрос. Действительно, можно указать базовый класс для таска. Проблема однако в том, что:
                  1. Проблему бойлерплейта он не решает
                  2. Я боялся нарваться на коллизию имён. Хотелось отделить логику задачи от логики celery
                  3. Минусы вы назвали сами. Частно говоря, вы их назвали больше, чем я знал
                  4. И вкусное: а мой подход не запрещает их комбинировать
                    0
                    Никто не мешает написать свой «базовый» класс на основе оригинального, что бы спрятать в него бойлерплейт. Я так и сделал — запихал туда обработку ошибок, логирование и хитрую инициализацию энвайромента.

                    Хотя… вот вам самый важный «минус» — экземпляр класс-таски в Celery создаётся только один, и потому его нельзя использовать для хранения данных выполняемой таски. Для этого можно использовать «словарик» Task.request — он создаётся каждый раз перед запуском таски.
                      0
                      Ох, точно, это и была та самая причина, по которой я отбросил затею с классами Task. Знатно подо мной тогда стул прогорел — а в чём смысл класса?

                      А обработку ошибок я в примерах и предложил направить в метод task. Разве что всё под рукой — в базовом классе, а не снаружи

                Only users with full accounts can post comments. Log in, please.