Как стать автором
Обновить

Комментарии 11

Спасибо за статью. У меня также была такая же проблема.
Тогда я сделал все просто

import asyncio

ну и просто
settings = asyncio.run(crud_settings.get_settings_by_clinic_id(db, clinic_id))

где crud_settings.get_settings_by_clinic_id() асинхронная функция с асинхронной сессией. Я уже тогда чувствовал что это неправильно, но решения на тот момент не нашел.

У меня раньше Django+Celery использовался много где и, в свое время, по многим граблям этого фреймворка я попрыгал.

Не знаю как в последних версиях (я застрял на 4.х), но раньше (лет 10 назад, хмм) celery очень не любил когда вы запускали какие-нибудь потоки (Thread) внутри тасков. Какие-то жуткие вещи там происходили, помнится - например, воркер мог форкнуться и забрать с собой часть ресурсов и потоков. В общем-то, eventloop это тоже касается.

Первым делом я получил loop в глобальной области моего файла tasks.py

?? А что вы делаете когда воркер форкается?

Если я использую asyncio внутри таски, то у меня loop создается каждый раз при запуске задачи. Чаще всего надо просто скачать 10-100 файлов параллельно.

В новых проектах тоже часто использую FastAPI, но если нужны бэкграунд-таски, то часто беру celery, но не делаю их асинхронными. Пул запускается в отдельном контейнере и принимает задачи через RabbitMQ. Интернет предлагает запускать таски через celery напрямую из асинхронных тасков или через бэкграунд-воркера (тред), но мне такое решение не понравилось и я просто слегка реверснул механизм запуска таски - публикую сообщение в очередь RabbitMQ асинхронно через aio_pika, а если нужны результаты - то они приходят в Redis, причем если таска запускает другие таски, то стек корректно раскручивается.

Код запуска таски

import json
import asyncio
from uuid import uuid4

import aio_pika


class CeleryRMQConnector:
    def __init__(self, conn_str: str):
        if not conn_str.startswith('amqp://') and not conn_str.startswith('amqps://'):
            raise ValueError("CeleryRMQConnector can use only AMQP broker")
        self.conn_str = conn_str
        self._rmq = None
        self._rmq_channel = None

    async def _get_connection_channel(self):
        if not self._rmq:
            self._rmq = await aio_pika.connect_robust(
                self.conn_str,
            )
            self._rmq_channel = await self._rmq.channel()
        return self._rmq_channel

    async def send_task(self, task_name, queue_name, task_kwargs, expires=None):
        task_id = uuid4().hex
        channel = await self._get_connection_channel()
        await channel.default_exchange.publish(
            aio_pika.Message(
                body=json.dumps([[], task_kwargs,
                                 {"callbacks": None, "errbacks": None, "chain": None, "chord": None}]).encode(),
                correlation_id=task_id,
                priority=0, delivery_mode=2,
                # reply_to=self.result_queue_name,
                reply_to=None,
                content_type='application/json',
                content_encoding='utf-8',
                message_id=None,
                expiration=expires or 60 * 60,
                headers={
                    'argsrepr': "[]",
                    'kwargsrepr': "{}",
                    'group': None,
                    'origin': "gen@blablabla",
                    'retries': 0,
                    'expires': expires,
                    'id': task_id,
                    'root_id': task_id,
                    'task': task_name,
                    'lang': 'py',
                },
            ),
            routing_key=queue_name,
        )
        return task_id

Здесь не закрывается соединение при завершении - по-хорошему надо бы использовать AsyncExitStack или контекстный менеджер.

Однако, celery очень тяжелая и внутри происходит дочерта всякой магии, достаточно и костылей. Раньше я очень активно использовал celery или отдельно kombu. На что-то другое перейт непросто когда есть уже некоторый багаж наработок..

Я долго ломал себя на то чтобы браться за Celery, но и на aiopika в сожалению ранее нигде не натыкался. Сейчас глянул доку мельком, очень понравилось. Спасибо за ваши комментарии, пойду тыкать). Для контроля ресурсов вешал на воркера Cgroups и в случае чего он килял процесс, а докер его снова заводил. Касаемо запуска тасок внутри тасок: всяким образом старался обходить такие вещи, потому что я всегда работаю с Celery как с чем-то, что может в какой-то момент повести себя непредсказуемо и делаю так, чтобы он выполнял не более одной задачи.

А почему не использовать aiopika? Селери впринципе не подразумевает использование async

Мне кажется Celery давно устарел и писался он под python2, почему бы ну упростить всё до asyncio.Queue и multiprocessing.Queue ?

я начал задумываться о том, чтобы уходить с любимого Python куда‑нибудь в сторону Rust или Go, потому что, как ни крути, на нём становится писать больновато, когда дело касается каких‑то более «интересных» задач.

Аналогично, только в моем случае php и java

>я продолжу смотреть на оборачивание всего, что заблокирует GIL, в различные функции библиотек asyncio или threading, как один большой костыль относительно эстетичного синтаксиса Python

Эм, а где-то возможна конкурентность без эвент-машин, потоков или процессов?

Я тут имел ввиду отсутствие нативной поддержки асинхронности языка. Питон имеет библиотеки для того чтобы это работало, но под капотом все равно имеем дело с синхронным GIL. Взять то же GO, который имеет встроенные в язык горутины, который был создан уже после того как в мире появился компьютер с двумя ядрами. И да, питон хорош в определенном спектре задач, но не на столько хорош по скорости. Имея такой подход к асинхронности у нас появляется множество проблем касаемо того, как управлять этим сторонним, не присущим ядру языку потоку.

Ну на практике в большинстве места, где используется python можно просто запустить два инстанса рядом. Асинхронщина тут даже в плюс: помогает обрабатывать кучу i/o-bound одним процессом, а если у нас нужна конкурентность для cpu-bound то и голый python как-то использовать не нужно. Так что я не думаю, что главной проблемой производительности python является именно GIL. Главной проблемой является плохая оптимизируемость из-за утиной типизации и жирных объектов.

Другое дело, что вся асинхронщина в python всё ещё не является достаточно зрелой, да и реализовано это далеко не так хорошо, как в более новых платформах и языках.

Опять же, я не говорил что GIL это главная проблема в скорости Python. В данной статье и в области этих комментариев речь шла про асинхронность в Python. С остальным соглашусь. И возможно добавлю с этот список "некомпилируемость", интерпретация на лету тоже раскладывает свои грабли.

Если хочется именно Celery "сделать" асинхронным, то можно действовать примерно так:

class AsyncTask(celery.Task):
    async def apply_async(self, *args, **kwargs):
        ...

    async def async_run(self, *args, **kwargs):
        ...

    def __call__(self, *args, **kwargs):
        return self._get_app().loop.run_until_complete(self.async_run(*args, **kwargs))


class AsyncCelery(celery.Celery):
    task_cls = AsyncTask

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._loop = None

    @property
    def loop(self):
        if self._loop is None:
            try:
                self._loop = asyncio.get_running_loop()
            except RuntimeError:
                self._loop = asyncio.new_event_loop()
        return self._loop


    @property
    async def async_connection(self):
        # тут работа с aio_pika

    async def send_task_message(self, *args, **kwargs):
        # тут работа с aio_pika

    async def send_task(self, *args, **kwargs):
        ...

celery_app = AsyncCelery('project')

@celery_app.task
async def execute_something(*, app, **kwargs):
      ...

 # ну и соответственно в коде для отложенного вызова таски мы будем писать вот так
...
await execute_something.apply_async(**kwargs)
...

Плюсы, на мой взгляд, очевидны, ну а минусы - чем больше мы используем фишек Cелери, тип больше будет разрастаться класс AsyncCelery.

З.Ы. @kield Спасибо за статью. И не надо уходить из Питона. Python рулит)

Зарегистрируйтесь на Хабре, чтобы оставить комментарий

Публикации

Истории