Pull to refresh

Comments 30

Если мне память не изменяет, то фоновые задачи выполняются последовательно и если в какой-то из задач рейзится исключение - то следующие не выполнятся. Так что это можно тоже отметить как потенциальный минус

Это про какую библиотеку?

Прошу прощения, это про background task в fastapi/starlette

В background task задачи выполняются не последовательно: они стартуют сразу после ответа на запрос. Если задача синхронная, то выполняется в threadpool (anyio.to_thread.run_sync).

По исходному коду не нашел причины, почему следующие задачи бы не выполнялись.

Хм, дока starlette говорит об обратном, но там кейс немного другой и используется BackgroundTasks(который fastapi использует по дефолту). Ваш пример напрямую использует BackgrounTask из starlette. Стоит сделать ремарку на поведение в Fast API или указать напрямую, в заголовке что используется starlette BackgroundTask.

Ну, дока может говорить о чем угодно, но это не значит, что оно так работает
Вот, например, исходники starlette, в которых явно видно, что таска будет выполнена сразу после записи ответа в asgi (т.е. это обязательный этап запуска handler'а) - https://github.com/encode/starlette/blob/9f16bf5c25e126200701f6e04330864f4a91a898/starlette/responses.py#L161

Забавную штуку, кстати, буквально только что нашел - https://github.com/adriangb/asgi-background

Вот это как раз "те самые таски, что в фоне", причем, совместимые с любым ASGI

Регулярно приходится сталкиваться с такими задачами и пришел к выводу, что лучше всего просто пульнуть таску в очередь, и оттуда уже сервисами их исполнять. Точно ничего не потеряется (nack на фейле) + легкий скейлинг и без спайков в нагрузке

Спасибо за статью, неплохой обзор получился и открыл для себя FastStream.
Вопрос по воркерам FastStream: когда воркер забирает таску, разве он не должен сделать её недоступной для остальных, чтобы одну и ту же задачу не выполнили n раз?

Если у каждой библиотеки запустить по 5 воркеров, то результат будет таким: arq, saq и celery выполнят все 10к задач, а faststream - в 5 раз больше (потому что одна и та же задача будет выполняться на каждом воркере).

Думаю, у faststream такое поведение из-за того, что в первую очередь он создан для общения между микросервисами, а не для фоновых задач.

Ну, собственно, я это и спрашиваю — почему все воркеры берут одну и ту же задачу? Либо её статус никак не меняется, либо не успевает поменяться (все 5 уже забрали до изменения статуса), либо что-то ещё.

FastStream предполагает, что потребление только одним подписчиком нужно реализовать вручную: https://github.com/airtai/faststream/issues/693

И в целом модель применения FastStream отличается от arq/saq/celery - они нужны для фоновых\запланированных задач, а FastStream для управляемой событиями архитектуры сервисов

Но, конечно, никто не мешает вам начать применять их, как вам хочется

Это не так. Данный вопрос был имеено в том, что "как сделать так, чтобы на конкуретных воркерах конкретная таска могла работать только в одном экземпляре". Это как раз распределенный лок, о чем я человеку и сказал.

Бродкастинг сообщений по всем экземплярам сервиса - это логика, которую предоставляет брокер, а не фреймворк. В данном конкретном случае Redis Pub/Sub работает именно так. Тут уже нужно разбираться с тем, что вы хотите использовать и получить.

Но насчет отличий FastStream и celery-like инструментов - очень точное замечание. В качестве замены Celery я сам всегда советую взять taskiq

Да, вы конечно правы, нужно выражаться правильно - обработка, а не потребление

При тестировании замечал, что иногда один из воркеров работает заметно медленнее остальных. Тем не менее, это не мешает ему обработать все 10к задач, даже если остальные воркеры завершили работу десятки секунд назад.

Как указал выше, faststream создан именно для общения между микросервисами. Это подразумевает, что все, кто подписан на топик, должны получить сообщение.

С другой стороны, для разных брокеров есть куча специфичных параметров, которые позволяют гранулированно настроить поведение публикации/обработки сообщений, в том числе отработку только один раз. Но этот момент не совсем очевидный и требует проверки, например встречал похожий issue для NATS.

Ну вообще, такое поведение потому что вы решили взять redis pub/sub в качестве транспорта. Для background тасок я бы предложил лучше взять redis list (если уж очень хочется redis) а еще лучше - NATS JetStream. Но вообще интересно почитать было, как люди используют) Спасибо за статью!

Спасибо за совет!

P.S. Приятно видеть в комментариях отца FastStream :)

Я просто искал причины аномального роста трафика из РУ сегмента)

А так меня не сложно найти - я во многих чатиках присутствую. А можно даже напрямую меня пинговать по любым вопросам, связанным с брокерами и/или FastStream в чате по фреймворку - https://t.me/python_faststream (мы там еще и фичи всей толпой проектируем)

для каких задач применяются все эти технологии, для интернет магазинов это как будто избыточно, крупные проекты наверное испольуют самописные системы?

Если у вас CRUD'овый интернет магазин, то избыточно. Но как только появляются оплаты, интеграции, квитанции, задачи по расписанию, то без фоновых задач никак.

Даже если эти задачи находятся в отдельных микросервисах (как у крупных проектов), тот же faststream значительно упростит реализацию.

А что скажете про бенчмарки автора saq?

В бенчмарках автора 1000 итераций, на моей машине итераций saq спокойно переносит 500к и обгоняет arq со 100к.

ARQ enqueue 100000 18.29077935218811
ARQ process 100000 noop 501.143620967865

SAQ enqueue 500000 39.770418882369995
SAQ process 500000 noop 209.9531545639038

arq же в свою очередь не смог пройти 300к и заставил стаймаутить редис

подозреваю тут тоже играет роль этот плюс saq:
>> Avoids polling by leveraging BLMOVE or RPOPLPUSH and NOTIFY

Бенчмарки действительно показывают, что saq работает значительно быстрее, чем arq. Локально цифры тоже подтверждаются.
Но чтобы это как-то отразилось в реальной практике, похоже, нужны бешеные нагрузки.

Почему при нагрузочном тестировании saq так странно себя ведет - загадка. Судя по логам, сервер периодически намертво зависает на 5-20 секунд в момент постановки задачи в очередь (в это время FastAPI не может обработать ни один запрос).

это проявляется в обоих подходах к реализации ручки?

с FastAPI BackgroundTask и "прямым" вызовом queue.enqueue?

в реализации Queue.enqueue стоит семафор c таким описанием:

max_concurrent_ops: maximum concurrent operations. (default 20) This throttles calls to `enqueue`, `job`, and `abort` to prevent the Queue from consuming too many Redis connections.

если бенчмарки (от saq) с прямым вызовом queue.enqueue не зависают, может такое быть что зависает/заставляет зависать именно реализация BackgroundTask (как раз потенциально из за семафора)?

Пробовал на python 3.10 и 3.11, в разных комбинациях, в том числе с разным значением max_concurrent_ops.

Для варианта JSONResponse({}, background=BackgroundTask(...)) результат один и тот же. Как только убираю семафор, зависания исчезают, показатели становятся примерно как у arq, но всё равно ниже на 2-5%.

Теперь про прямой вызов queue.enqueue.

На первый взгляд как будто бы всё нормализуется. Но стоит учесть, что в этом случае искусственно занижается нагрузка от locust (т.к. сначала задача ставится в очередь, и только затем отдается ответ на запрос; через BackgroundTask происходит ровно наоборот).

Если увеличить количество пользователей до 5000, то у saq всё равно сохраняется тенденция к деградации сервиса. При этом у arq всё отлично: RPS выше почти на 30%.

SAQ
SAQ
ARQ
ARQ

Спасибо, интересно. Хотя бы точно выяснили, что действительно семафор внутри влияет на BackgroundTask (это уже вопрос к реализации FastAPI - комментатор выше тоже писал о проблемах в BackgroundTask).

насчет остального нет предположений

К минусам arq так же можно отнести, что достаточно долго над ним не велась работа, поэтому и появилcя saq, как его переработанный форк
(и интересно, что saq выбрали ребята из Litestar в своих примерах - https://github.com/litestar-org/litestar-pg-redis-docker/blob/main/app/lib/worker.py)

но у автора arq (он же автор pydantic'a, к слову) в планах целый роадмап по переработке arq - https://github.com/samuelcolvin/arq/issues/437

Да, изначально одним из минусов было то, что последний релиз arq был в конце 2022 года, но внезапно 1 мая этого года выкатили новую версию. Поэтому пришлось убрать)

Терпим и ждём celery asyncio pool и django async db driver. Слишком больно по времени терять среду Джанги и собирать велосипеды из кучи разрозненных либ.

Вопрос по асинхронность, а почему нельзя просто запустить в celery таску, которая "внутри" будет асинхронной?

Условно:

celery -A worker4messages.celery worker -P threads

async def entrypoint_task(...):
  ...

...
@celery.task(name="example_task")
def example_task(...):
  asyncio.run(entrypoint_task(...))
  return True

В моем примере происходит почти то же самое, просто логика вызова асинхронной функции из синхронной спрятана в декораторе.

asyncio.run создает новый цикл событий, выполняет корутину и закрывает цикл событий. Если для таски нужен предварительный сетап (например подключение к базе данных), то это также нужно сделать внутри таски. Поэтому оптимальнее сделать сетап один раз, создать/сохранить event loop и уже в нем запускать таски.

Sign up to leave a comment.