Последние полгода я начал задумываться о том, чтобы дополнить стек помимо Python чем-нибудь вроде Rust или Go, потому что привыкнув к строгой типизации этого уж больно-сильно не хватает и на Python. Со мной, конечно, многие поспорят, но я продолжу смотреть на оборачивание всего, что заблокирует GIL, в различные функции библиотек asyncio
или threading
, как один большой костыль относительно эстетичного синтаксиса Python.
Недавно, я столкнулся с задачей, когда с проекта на Python нужно было стряхнуть пыли и заставить работать чуточку производительнее. В следствии чего монолит был распилен на микросервисы, а брокером между сервисами стали всем знакомый RabbitMQ и такой же старый как сам Python - Celery. Проект был перенесен с Django на FastAPI, который по-моему субъективному мнению является идеальным решением для любых бэкендов на Python, если мы не говорим о чём-то высоконагруженном, где с питона стоит слезть на другой язык. Вообще, микросервисы это то, что даёт возможность разработать большую часть кодовой базы дёшево, выделив уязвимые места в микросервисы на других языках.
Начнём с конфигурации docker-compose
файла:
version: '3.8'
services:
db:
image: postgres:15.1-alpine
env_file:
- ./.env
volumes:
- postgres_data:/var/lib/postgresql/data/
app:
build: ./backend
depends_on:
- db
env_file:
- ./.env
ports:
- "8000:8000"
volumes:
- ./backend/src:/app/
...
rabbit:
image: rabbitmq:3.11.9-management
hostname: rabbit
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
- RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit disk_free_limit 1147483648
volumes:
- ./rabbitmq:/var/lib/rabbitmq
flower:
image: mher/flower
environment:
- CELERY_BROKER_URL=amqp://admin:admin@rabbit:5672//
- RESULT_BACKEND=rpc://
- FLOWER_PORT=5555
ports:
- "5555:5555"
depends_on:
- rabbit
worker:
build: ./backend
command: python -m celery -A celery_app.celery worker --loglevel=info -Q celery --logfile=celery_app/celery.log
volumes:
- ./backend/src:/app/
env_file:
- ./.env
depends_on:
- rabbit
environment:
- C_FORCE_ROOT=yes
Для мониторинга задач Celery использовал опять же всем знакомый и до боли простой Flower. Так же дополнительным аргументом для RabbitMQ использовал disk_free_limit
для того чтобы растянуть максимально допустимый под сообщения объем памяти. Заострять внимание на каждом Dockerfile
я не буду, потому что ничего специфического там нету. Касаемо конфигурации Celery, тоже ничего сложного нету, мануалов полно в интернете. Так что перейдем сразу в сути проблемы, того, с чем конкретно у меня возникли сложности.
Моя реализация подключения к базе данных через алхимию выглядит следующим образом:
engine = create_async_engine(
DATABASE_URL,
echo=True
)
session: async_sessionmaker[AsyncSession] = async_sessionmaker(
engine,
expire_on_commit=False
)
К моему разочараванию в Celery так и не появилось ничего нового и интересного. Для того, чтобы использовать асинхронную сессию необходимо использовать асинхронные функции, а значит необходимо обернуть эту функцию во что-то, чтобы celery
не ругался.
Первым делом я получил loop
в глобальной области моего файла tasks.py
, который хранил в себе все таски для Celery (у меня их, если что всего 4). Выглядело это так:
loop = asyncio.get_event_loop()
Так же мою сессию необходимо было обернуть в функцию async_scoped_session
, чтобы избежать ошибок связанных с одновременным подключением к сессии нескольких instanc'ов приложений (воркера и самого FastAPI). Выглядела она следующим образом:
@asynccontextmanager
async def scoped_session():
scoped_factory = async_scoped_session(
session,
scopefunc=current_task,
)
try:
async with scoped_factory() as s:
yield s
finally:
await scoped_factory.remove()
Важным в этом коде является то, что мы в конце вызываем метод .remove() необходимый для корректного завершения сессии, что применительно только к scoped session (в случае обычной сессии всю работу с открытием и закрытием скрывает под собой контекстный менеджер). Подробнее про это вы можете почитать в документации. После всех проделанных операций теперь мы можем использовать нашу сессию с помощью:
async with scoped_session() as s:
await s.execute(...)
Что касается Celery, т.к. мы не имеем возможности использовать async
функции, то нам нужно будет вынести всю асинхронщину в отдельные функции и воспользоваться тем самым loop
, лежащим в tasks.py
. В таком случае наша таска будет выглядеть примерно таким образом
@shared_task(
bind=True,
name='celery:test'
)
def test_task(self, data: dict, prices: dict):
result = loop.run_until_complete(здесь_ваша_асинхнонная_функция(и, аргументы))
return result
После всех проделанных манипуляций, всё завелось и работает корректно и быстро.