Что делает ваше FastAPI-приложение, когда клиент неожиданно закрывает соединение? По умолчанию — ничего. Если сервису важно экономить ресурсы (например, при работе с LLM), дисконнекты приходится обрабатывать явно, иначе GPU продолжит генерировать токены в пустоту.
В FastAPI есть встроенные механизмы отмены обработки запроса при дисконнекте клиента. Однако просто включить их недостаточно: нужно подготовить ваш код к этому. Иначе возможны проблемы: зависшие транзакции и сломанный пул соединений с БД.
В этой статье разберёмся, как с этим работать:
Узнаем, что происходит под капотом в случае дисконнекта — от
TCPиASGIдо вашего кода.Рассмотрим практические подходы для стриминговых и обычных эндпоинтов.
Хотя мы разбираем проблему на примере сервиса стриминга сообщений от LLM, описанные подходы применимы к любым сервисам на FastAPI + Uvicorn.
Версии библиотек: в примерах кода используются
Uvicorn 0.40.0,Starlette 0.50.0,FastAPI 0.128.0,vLLM v0.13.0

Введение
📜 Мы разрабатывали прокси-сервис для роутинга запросов в LLM-провайдеры (см. предыдущий пост). Обычные эндпоинты работали без проблем, но когда мы добавили поддержку стриминга — начались сюрпризы.
Сначала поступили вопросы от DBA — почему наш сервис держит открытыми транзакции по 40 минут? Открыли
pg_stat_activity— действительно, несколько сессий в статусеidle in transaction. Потом посыпались алерты о мёртвых соединениях в пуле.
После разбора стало понятно: проблема в некорректной обработке дисконнектов. Клиент закрывал соединение в момент открытой транзакции, из-за чего исполнение прерывалось и транзакция не завершалась корректно. Захотелось понять, что FastAPI делает, когда соединение прерывается. И почему иногда он вовсе ничего не делает. 🤯
Поче��у важно уметь обрабатывать дисконнекты
Ресурсы: Когда соединение с клиентом обрывается, запрос к LLM не отменяется — GPU продолжает работать, занимая ресурсы для
self-hostedмоделей или накапливая расходы при использовании внешних провайдеров (как OpenAI).Работа с БД: Самая серьёзная проблема. Если соединение обрывается во время транзакции, она может остаться незакоммиченной, а в пул возвращаются сломанные соединения.
Наблюдаемость: Вы не узнаете, сколько запросов не было обработано до конца, что важно для отладки межсервисного взаимодействия и аналитики.
Почему дефолтная обработка небезопасна
Когда соединение обрывается, ASGI-сервер (например, Uvicorn) отправляет в приложение событие http.disconnect. Однако логика обработки со стороны приложения зависит от типа эндпоинта:
Обычные эндпоинты: событие игнорируется — ваш код продолжает работать как ни в чём не бывало.
Стриминговые эндпоинты (
StreamingResponse,EventSourceResponse): дисконнект ловится, но отмена происходит «грубо» — может прервать транзакцию или код очистки.
Прежде чем погружаться в детали, давайте определимся с целью:
🧠 Чего мы хотим добиться: 100% гарантия обнаружения дисконнекта — не наша цель. Сети асинхронны, поэтому в рамках одного HTTP-запроса это попросту невозможно.
Наш фокус — эндпоинты с двумя фазами: быстрые критичные операции (работа с БД) и долгие некритичные (ожидание ответа от LLM). Для первых важна корректность завершения, для вторых — возможность прервать исполнение и освободить ресурсы.
Итого: надёжная работа с БД, экономия ресурсов, понимание статистики дисконнектов — и всё это простыми средствами, без усложнения бизнес-логики приложения.
Механика дисконнекта: от TCP до пользовательского кода
💡 При обнаружении разрыва соединения ASGI-сервер не останавливает ваш код. Вместо этого приложение должно само узнать об отключении — использовать вызов
await request.receive()и проверять, не пришло ли событие{"type": "http.disconnect"}. Отмену запроса тоже придётся обрабатывать самостоятельно.
⚙️ Справка по стеку FastAPI приложения: Uvicorn — это ASGI-сервер, он обрабатывает TCP-соединения и транслирует сетевые события в ASGI-сообщения. Starlette — ASGI-фреймворк, предоставляющий роутинг, middleware и абстракции Request/Response. FastAPI построен поверх Starlette и добавляет валидацию, автодокументацию и dependency injection. ASGI — это просто спецификация, контракт между сервером и фреймворком.

Уровень TCP
Когда клиент закрывает соединение, его TCP-стек отправляет либо FIN при штатном закрытии (пользователь закрыл вкладку, HTTP-клиент завершил работу), либо RST при аварийном (убитый процесс, принудительный разрыв). ОС сервера распознаёт этот сигнал и помечает соединение как закрытое. Подробнее можно узнать в серии постов Cloudflare "When TCP sockets refuse to die".
Уровень ОС и Asyncio
ОС уведомляет event loop Python о закрытии соединения через системные механизмы вроде epoll. Asyncio вызывает callback connection_lost() объекта Protocol — это низкоуровневый интерфейс asyncio для работы с сетевыми соединениями. Этот callback — мост между событиями kernel-space и Python user-space, позволяющий ASGI-серверу реагировать на изменения состояния соединения.
Уровень ASGI: событие http.disconnect
Давайте посмотрим на реализацию Protocol внутри Uvicorn.
Установка флага дисконнекта
При вызове connection_lost() Uvicorn устанавливает флаг (uvicorn/protocols/http/h11_impl.py:112) и разблокирует ожидающие операции (uvicorn/protocols/http/h11_impl.py:122):
def connection_lost(self, exc: Exception | None) -> None: # ... logging ... if self.cycle and not self.cycle.response_complete: self.cycle.disconnected = True # Set the flag if self.cycle is not None: self.cycle.message_event.set() # Wake up waiting receive()
Флаг disconnected теперь равен True, но ваш код приложения не прерывается. Вызов message_event.set() пробуждает ожидающие receive(), чтобы пользовательский обработчик мог получить событие http.disconnect.
Опрос состояния соединения
Когда ваше приложение вызывает await request.receive(), Uvicorn проверяет флаг отключения (uvicorn/protocols/http/h11_impl.py:542):
async def receive(self) -> ASGIReceiveEvent: ... if not self.disconnected and not self.response_complete: self.flow.resume_reading() await self.message_event.wait() self.message_event.clear() if self.disconnected or self.response_complete: return {"type": "http.disconnect"} message: HTTPRequestEvent = { "type": "http.request", "body": self.body, "more_body": self.more_body, } self.body = b"" return message
message_event устанавливается при получении данных или при закрытии соединения. Получив {"type": "http.disconnect"}, приложение понимает: клиент ушёл, можно прекращать работу.
⚠️ Важно: После полного прочтения тела запроса вызовы
receive()становятся пассивным ожиданием — никакого сетевого I/O, только проверка флагов и ожидание сигнала.
Уровень приложения: обработка в FastAPI/Starlette
Что будет дальше — зависит от типа Response:
StreamingResponse: Starlette запускает фоновую задачу, которая опрашивает
receive()на предметhttp.disconnect, пока другая задача стримит ответ (starlette/responses.py:220-232). При обнаружении отключенияtask groupотменяется, что запускает очистку — Starlette пробрасываетCancelledError.Другие типы ответов: Для стандартных
Response,JSONResponseи других нестриминговых ответов Starlette никак не реагирует на дисконнекты.
Чтобы обнаружить отключение самостоятельно, нужно явно опрашивать событие — вызывать await request.receive() и проверять сообщения http.disconnect.
Данные улетают в пустоту
Что происходит, когда ваше приложение не проверяет отключение и продолжает генерировать ответ?
Спецификация ASGI говорит, что сервер должен выбрасывать подкласс OSError при вызове send() в случае закрытого соединения, но явно предупреждает: "This is not guaranteed, however, especially on older ASGI server implementations (it was introduced in spec version 2.4)."
На практике Uvicorn (на момент написания статьи — версия 0.40.0) не выбрасывает исключений — данные просто отбрасываются (uvicorn/protocols/http/h11_impl.py:460):
async def send(self, message: ASGISendEvent) -> None: # ... if self.disconnected: return # Silently discard - no exception raised
Требование выбрасывать OSError появилось в ASGI 2.4, а Uvicorn реализует версию 2.3.
🎯 Главные выводы
По умолчанию ваш код продолжает выполняться после отключения клиента.
Чтобы узнать об отключении, нужно явно вызывать
await request.receive()и проверятьhttp.disconnect.Без подобной проверки данные ответа молча отбрасываются — особенность реализации
send()в Uvicorn.StreamingResponseпроверяет отключение автоматически, другие типыResponse— нет.
Обработка дисконнектов в не-стриминговых эндпоинтах
Не-стриминговые (или просто «обычные») эндпоинты — это классические request-response ручки.
📜 Сервис соседней команды использовал обычный HTTP-эндпоинт нашего приложения — никакого стриминга, простой request-response. Обработка занимала 10–15 секунд на запрос. Команды договорились о лимитах: максимум 32 параллельных запроса, таймаут 30 секунд. Внутренний сервис для фоновой обработки — никаких внешних клиентов, никаких защитных механизмов. Зачем усложнять?
Через какое-то время наш сервис начал падать под нагрузкой. Запросов в обработке было значительно больше 32 — и они продолжали накапливаться.
Причина оказалась банальной: баг в конфиге клиента. Реальный таймаут был 10 секунд, а не 30. Клиент ждал 10 секунд, получал таймаут — и тут же отправлял повторный запрос. Новые запросы прилетали быстрее, чем мы успевали обрабатывать старые. А сервер продолжал честно обрабатывать брошенные запросы, сжигая CPU на вычисления, результаты которых уже никто не ждал.
Есть две хорошие статьи про обработку дисконнектов с использованием anyio: Understanding Client Disconnection in FastAPI, Stop Burning CPU on Dead FastAPI Streams. Пример из первой:
import anyio from typing import Awaitable, Any from fastapi import Request async def disconnected(request: Request) -> None: """Ждём событие отмены http.disconnect""" while True: message = await request.receive() if message["type"] == "http.disconnect": break async def wrap(call: Awaitable[Any], cancel_scope: anyio.CancelScope): """Ждём завершения вызова call, затем отменяем весь скоуп""" await call cancel_scope.cancel() # Параллельный запуск 2 задач - пользовательская `do_expensive_work` и мониторинг отключения `disconnected` @app.post("/process") async def process(request: Request): async with anyio.create_task_group() as tg: # Первая завершённая задача отменяет все задачи в скоупе tg.start_soon(wrap, disconnected(request), tg.cancel_scope) tg.start_soon(wrap, do_expensive_work(), tg.cancel_scope)
Однако это можно реализовать и на чистом asyncio. Мы рассмотрим такую реализацию из vLLM — фреймворка для инференса больших языковых моделей.
Декоратор для отмены запросов
Идея — обернуть эндпоинты декоратором @with_cancellation, который автоматически отменяет обработчик эндпоинта при отключении клиента. Эта реализация адаптирована из vLLM v0.13.0. Начнём с упрощённой версии:
import asyncio import functools from contextlib import suppress from fastapi import Request # https://github.com/vllm-project/vllm/blob/v0.13.0/vllm/entrypoints/utils.py#L46 async def listen_for_disconnect(request: Request) -> None: """Ждём событие отмены http.disconnect""" while True: message = await request.receive() if message["type"] == "http.disconnect": break # https://github.com/vllm-project/vllm/blob/v0.13.0/vllm/entrypoints/utils.py#L61 def with_cancellation(handler_func): @functools.wraps(handler_func) async def wrapper(*args, **kwargs): # Получаем объект Request request = args[1] if len(args) > 1 else kwargs["raw_request"] # Запускаем 2 параллельные задачи - завершение любой из них отменяет другую handler_task = asyncio.create_task(handler_func(*args, **kwargs)) cancellation_task = asyncio.create_task(listen_for_disconnect(request)) # Если мониторинг дисконнекта завершится первым - обработчик получит CancelledError done, pending = await asyncio.wait( [handler_task, cancellation_task], return_when=asyncio.FIRST_COMPLETED ) for task in pending: task.cancel() if handler_task in done: return handler_task.result() return None return wrapper
Примечание: Декоратор возвращает
Noneпри отключении клиента — ответ всё равно никуда не уйдёт. Отменённая задача-обработчик выполняет свою очистку в фоне. Если нужно, чтобыCancelledErrorпробросился дальше (для трекинга ошибок или middleware), можно вызватьhandler_task.result(), чтобы выбросить исключение.
Пример использования декоратора:
from fastapi import FastAPI, Request from pydantic import BaseModel import asyncio app = FastAPI() class JobRequest(BaseModel): job_id: str duration: int @app.post("/process") @with_cancellation async def process_job(job: JobRequest, raw_request: Request): try: # Эмулируем долгую операцию for i in range(job.duration): await asyncio.sleep(1) print(f"Processing job {job.job_id}: step {i+1}/{job.duration}") return {"status": "completed", "job_id": job.job_id} except asyncio.CancelledError: # Выполняем специфичную для обработки отмены логику (логи, метрики и тд) print(f"Job {job.job_id} was cancelled by client disconnect") # Пример: vLLM удаляет запрос из очереди и очищает GPU через вызов engine.abort(request_id) raise # Пробросить исключение дальше finally: # Код очистки — должен быть защищён от отмены через `shield` async def cleanup(): await asyncio.sleep(0.1) # Эмулируем асинхронные вызовы print(f"Cleanup completed for job {job.job_id}") # Необходимо хранить жёсткую ссылку на задачу - иначе garbage collector может удалить ее до завершения # https://docs.python.org/3/library/asyncio-task.html#shielding-from-cancellation cleanup_task = asyncio.create_task(cleanup()) try: await asyncio.shield(cleanup_task) except asyncio.CancelledError: # shield() был отменён, но задача продолжает работать # ждём завершения задачи и пробрасываем исключение дальше # https://github.com/python/cpython/issues/103486 await cleanup_task raise
Три важных момента:
except CancelledError: Здесь обрабатываем именно события отмены — логируем, отправляем метрики. После обработки обязательно пробрасываем исключение дальше.finally-блок: Выполняется всегда — неважно, успешно завершился запрос, упал с ошибкой или был отменён. Здесь реализуем очистку ресурсов.asyncio.shield(): Защищает асинхронный код вfinallyот прерывания. Без него код очистки ресурсов может быть прерван и мы получим неконсистентное состояние ресурсов.
Очистка внешних ресурсов: Отмена запроса не освобождает внешние ресурсы автоматически. Например, в vLLM движок генерации ловит CancelledError и вызывает await self.abort(request_id), чтобы прервать обработку запроса и освободить GPU. Тот же принцип применим к любым другим ресурсам.
Обработка дисконнектов через отдельный Event
Для сценариев, где большинство операций критичны (запись в базу, транзакции, вызовы внешних API), может понадобиться обратный подход: защищать всё, отменять выборочно. Вместо автоматической отмены всего подряд (с точечной защитой через shield()), даём коду возможность проверять сигнал отмены в нужных местах:
import asyncio from contextlib import asynccontextmanager, suppress from typing import AsyncIterator, Coroutine from fastapi import Request async def listen_for_http_disconnect(request: Request, event: asyncio.Event) -> None: """Ждём событие отмены http.disconnect""" while True: message = await request.receive() if message["type"] == "http.disconnect": event.set() break @asynccontextmanager async def detect_disconnect(request: Request) -> AsyncIterator[asyncio.Event]: """ После получения сигнала о дисконнекте устанавливаем событие `disconnect_event`. """ disconnect_event = asyncio.Event() disconnect_listener = asyncio.create_task( listen_for_http_disconnect(request, disconnect_event) ) try: yield disconnect_event finally: with suppress(asyncio.CancelledError): disconnect_listener.cancel() await disconnect_listener async def cancel_on_disconnect( work_coro: Coroutine, disconnect_event: asyncio.Event ): """ Возвращаем результат работы `work_coro` или выбрасываем CancelledError, если клиент отключился. """ work_task = asyncio.create_task(work_coro) disconnect_task = asyncio.create_task(disconnect_event.wait()) done, pending = await asyncio.wait( [work_task, disconnect_task], return_when=asyncio.FIRST_COMPLETED ) # Отменяем незавершённые задачи for task in pending: task.cancel() with suppress(asyncio.CancelledError): await task # Возвращаем ответ или пробрасываем CancelledError дальше (например для Sentry) return work_task.result()
Такой подход даёт более гранулярный контроль. Отменятся только те операции, которые явно обернуты в cancel_on_disconnect. Всё остальное выполнится до конца:
@app.post("/process") async def process_job(job: JobRequest, request: Request): async with detect_disconnect(request) as disconnect_event: # Защищено от отмены, выполнится в любом случае await db.log_request(job.job_id) # Эмулируем тяжёлую работу async def expensive_work(): for i in range(job.duration): await asyncio.sleep(1) print(f"Processing step {i+1}") return {"processed": job.duration} try: # Отменится в случае дисконнекта result = await cancel_on_disconnect(expensive_work(), disconnect_event) # Защищено от отмены, запускается только в случае успеха `expensive_work` await db.log_completion(job.job_id) return {"status": "completed", "result": result} except asyncio.CancelledError: # Защищено от отмены, выполнится в любом случае await db.log_cancellation(job.job_id) return Response(status_code=499)
Подход с декоратором отменяет эндпоинт целиком: вы защищаете (используя shield()) ключевые блоки кода. Подход, использующий Event, защищает всё по умолчанию — вы оборачиваете отменяемые операции в cancel_on_disconnect().
Если у вас мало критичных операций, используйте декоратор. Если ваш код допускает прерывание только в 1-2 местах, используйте подход с Event.
🎯 Главные выводы
Необходимо явно опрашивать сервер на наличие сигнала о дисконнекте — ваш код не отменяется автоматически.
Явно обрабатывайте
CancelledError.Используйте
finally-блоки для очистки, которая должна выполниться в любом случае.Оборачивайте критичные операции в
asyncio.shield(), чтобы предотвратить нежелательные прерывания.
🔬 Deep Dive: Почему request.receive(), а не request.is_disconnected()?
Описанные выше методы используют await request.receive() для обнаружения дисконнектов, а не метод Request.is_disconnected() из Starlette. Это может показаться нелогичным — зачем вызывать receive(), когда уже есть специальный метод для проверки статуса отключения?
Проблема с is_disconnected(): Он стабильно возвращает False при наличии BaseHTTPMiddleware — известный баг Starlette с версии 0.21.0. Это делает его ненадёжным в реальных проектах, где middleware (аутентификация, логирование, метрики) используется повсеместно.
Почему request.receive() безопасен?: Порядок выполнения в рамках FastAPI гарантирует это. Если посмотреть на fastapi/routing.py, обработчик запроса следует такой последовательности:
Парсинг тела запроса (routing.py:298:
body_bytes = await request.body()) — потребляет все сообщенияhttp.request.Обработка зависимостей Depends (routing.py:345:
await solve_dependencies()) — обработка зависимостей происходит после того, как тело запроса прочитано.Запуск обработчика эндпоинта (routing.py:355) — декораторы и код эндпоинта выполняются.
К моменту, когда выполняется любой декоратор или зависимость, тело запроса уже полностью прочитано. Вызов request.receive() получит только сообщения http.disconnect. Это работает для всех типов запросов FastAPI: JSON с Pydantic-моделями, Form-данные и даже загрузка файлов через UploadFile.
Когда это небезопасно: Единственный небезопасный случай — когда вы вручную стримите тело запроса через request.stream(), обходя парсинг со стороны FastAPI.
vLLM применяет этот подход даже для загрузки аудиофайлов (@with_cancellation + Form() + чтение файла). Более подробно, почему они отказались от идеи использования is_disconnected() — см. PR #11190.
Обработка дисконнектов в стриминговых эндпоинтах
Работа со стриминговыми ответами в FastAPI-приложениях состоит из двух этапов:
Шаг 1: Создаём асинхронный генератор, который yield'ит чанки с событиям��:
async def generate_events(): for i in range(100): await asyncio.sleep(0.1) yield {"event": "progress", "data": {"step": i}}
Шаг 2: Передаём генератор в класс стримингового ответа, который отправляет события клиенту:
from sse_starlette import EventSourceResponse @app.get("/stream") async def stream_endpoint(): return EventSourceResponse(generate_events())
Наиболее часто используются эти две основные реализации стриминговых ответов:
StreamingResponse— универсальный HTTP-стриминг с chunked transfer encoding. Стримит сырые байты или текст, формат вы контролируете сами.EventSourceResponse(SSE) — Server-Sent Events со структурированными сообщениями (поляevent,data,id).
Ключевое отличие: StreamingResponse — это низкоуровневый примитив для любого контента, а EventSourceResponse реализует стандарт протокола SSE. При этом логика отмены у них практически идентична: оба используют task group, чтобы параллельно стримить данные и слушать дисконнект. Оба пробрасывают CancelledError в ваш генератор.
Дальше будем говорить про EventSourceResponse — SSE это основной формат для стриминга при работе с LLM, его используют OpenAI, Anthropic и большинство inference-серверов. Но все паттерны ниже применимы и к StreamingResponse.
Как sse-starlette обрабатывает события отмены
Давайте посмотрим, как устроена библиотека sse-starlette:
Класс EventSourceResponse запускает три задачи в рамках task group (на самом деле их четыре — ещё одна для graceful shutdown, но сейчас она нам не важна):
async def __call__(self, scope, receive, send): async with anyio.create_task_group() as task_group: async def cancel_on_finish(coro: Callable[[], Awaitable[None]]): await coro() task_group.cancel_scope.cancel() # Task 1: Стриминг событий на клиент task_group.start_soon(cancel_on_finish, lambda: self._stream_response(send)) # Task 2: Отправка пинг-событий (по умолчанию каждые 15с) task_group.start_soon(cancel_on_finish, lambda: self._ping(send)) # Task 3: Ожидание сигнала завершения (http.disconnect или shutdown) task_group.start_soon(cancel_on_finish, self._listen_for_exit_signal)
Каждая задача обёрнута в cancel_on_finish. Если первой завершится _listen_for_exit_signal (получит http.disconnect), она отменит все задачи в task group — включая задачу стриминга.

Рис. 1: Координация задач в EventSourceResponse
💡 Send Timeout для медленных клиентов
Отдельный случай: клиент не отключился, но перестал читать данные из своего TCP-буфера. В таком случае сервер продолжает писать ответ в свой TCP-буфер. В какой-то момент буфер заполняется и последующие операции
send()блокируются. Формально соединение живое, ноhttp.disconnectне приходит.Параметр
send_timeoutрешает эту проблему: если запись очередного чанка в буфер не завершилась за N секунд (буфер переполнен) — закрываем соединение и прекращаем работу.Подробнее про эту проблему можно почитать в статье Cloudflare, а также в sse-starlette#89.
Обработка CancelledError — это ваша зона ответственности
При отключении клиента sse-starlette пробросит CancelledError в ваш генератор (точнее в asyncio-таску, в которой выполняется ген��ратор). Дальше — ваша задача: корректно завершить работу и освободить ресурсы.
Чтобы понять, почему это не так просто, вспомним, как работают асинхронные генераторы:
Создание —
gen = my_generator()создаёт объект, но код ещё не выполняетсяИтерация — каждый
await gen.__anext__()выполняет код до следующегоyield, затем приостанавливает выполнениеЗакрытие —
await gen.aclose()или выход из области видимости запускает блокиfinally
В случае отмены запроса CancelledError выбрасывается при взаимодействии с event loop. Это может быть await в основном коде генератора, а может быть await внутри блока finally — если там есть асинхронные операции очистки. Нужно учитывать оба случая.
Рекомендации по обработке событий отмены в асинхронных генераторах
Явно обрабатывайте
CancelledErrorИспользуйте блоки
finallyв ваших генераторах.Оборачивайте вложенные генераторы в
aclosing()— это гарантирует немедленное освобождение ресурсов (а не отложенное до сборки мусора) и очистку в том же async-контексте. Последнее критично для контекстных переменных. См. CPython issue #118944.Защитите критические операции (такие, как работа с БД) от прерывания в случае отмены запроса.
⚠️ Последствия: что случится без защиты критических операций
Зависшие транзакции: остаются открытыми бесконечно, видны только через мониторинг базы данных.
Повреждённые пулы соединений: соединения в невалидном состоянии возвращаются в пул, вызывая ошибки типа
The garbage collector is trying to clean up non-checked-in connection.Примечание: SQLAlchemy-сессии защищают свои блоки
__aexit__внутри, но не могут полностью защитить блокfinallyвашего генератора. Это именно та проблема, с которой мы столкнулись в продакшене. Вероятно, связано с внутренней механикой отмены вanyio.
Пример: стриминговый эндпоинт с корректной обработкой событий отмены
import asyncio import anyio from contextlib import aclosing from typing import AsyncIterator from dataclasses import dataclass from fastapi import FastAPI, HTTPException from sse_starlette import EventSourceResponse app = FastAPI() @dataclass class StreamingResult: exception: Exception | None = None cancelled: bool = False # Слой 1: Внутренний генератор - сырые токены от LLM async def generate_llm_chunks() -> AsyncIterator[str]: """Эмулируем стриминг от LLM-провайдера""" for i in range(10): await asyncio.sleep(0.5) yield f"token_{i}" # Слой 2: Бизнес логика async def process_stream() -> AsyncIterator[dict]: result = StreamingResult() try: # aclosing() гарантирует закрытие внутреннего генератора после выхода из контекстного менеджера # защищает от потери контекстных переменных, см. https://github.com/python/cpython/issues/118944 async with aclosing(generate_llm_chunks()) as gen: async for chunk in gen: # Бизнес логика: обработка ответа LLM processed = {"type": "chunk", "content": chunk} yield processed except asyncio.CancelledError: # Произошло отключение клиента result.cancelled = True raise except Exception as exc: result.exception = exc raise finally: # Защищаем код очистки от прерывания. Используем именно anyio.CancelScope, # потому что sse-starlette работает на anyio task groups — другие механизмы # защиты (например, asyncio.shield) могут работать некорректно. # См. https://anyio.readthedocs.io/en/stable/cancellation.html with anyio.CancelScope(shield=True): # Код очистки - не будет прерван await asyncio.sleep(0.1) # Эмулируем долгую работу if result.cancelled: pass # Логируем событие отмены при необходимости elif result.exception: pass # Логируем ошибку при необходимости # Слой 3: Сериализация в формат SSE async def serialize_to_sse(gen: AsyncIterator[dict]) -> AsyncIterator[dict]: """Преобразует события уровня приложения в SSE чанки""" async with aclosing(gen) as wrapped_stream: async for chunk in wrapped_stream: yield {"data": chunk} async def create_stream() -> AsyncIterator[dict]: # Инициализация генератора await asyncio.sleep(0) # Эмулируем асинхронные вызовы # Возвращаем собранный генератор return serialize_to_sse(process_stream()) @app.get("/stream") async def stream(): # Создаём генератор, обрабатываем ошибки до начала стриминга # Это позволяет вернуть корректный HTTP-код при ошибке инициализации try: sse_generator = await create_stream() except Exception as e: # Инициализация не удалась — возвращаем 500 до начала стриминга raise HTTPException(status_code=500, detail=str(e)) from e # Генератор создан — передаём в EventSourceResponse # sse-starlette сама обработает события отмены return EventSourceResponse(sse_generator)
🎯 Главные выводы
Явно обрабатывайте
CancelledErrorЗащищайте критичный код от отмены: блоки
finally, операции с БД и т.д.Оборачивайте вложенные генераторы в
aclosing()
Типичные ошибки и подводные камни
Пример из предыдущего раздела работает — в большинстве случаев. Но есть нюансы, о которых стоит знать.
Не создавайте два способа отмены для стриминговых задач
Этот антипаттерн встречается повсюду — в примерах из документации, в самописных решениях, в реализациях фреймворков. Вы наверняка видели или сами писали что-то подобное:
В документации:
async def monitored_stream(request): while events_sent < 100: if await request.is_disconnected(): break # Exit on disconnect yield {"data": f"Event {events_sent}"}
В адаптерах, оборачивающих генераторы:
async def sse_adapter(generator, request): async for item in generator: if await request.is_disconnected(): break # Exit on disconnect yield item
В кастомных streaming response (реальная реализация, с которой я столкнулся):
async def stream_response(self, send, stop_event): async for event in self.body_iterator: if stop_event.is_set(): break # Exit on disconnect await send({"type": "http.response.body", "body": event})
Подход выглядит разумно: проверяем наличие дисконнекта и завершаем работу при необходимости. Но это создаёт race condition, который обходит ваши обработчики исключений.
Проблема: Фреймворки вроде Starlette и sse-starlette уже мониторят дисконнекты и отменяют стриминговую задачу. Когда вы добавляете собственную проверку дисконнекта с break, вы создаёте два отдельных пути для завершения — отмена фреймворком и ваш ручной break. Эти пути конкурируют друг с другом, и путь через break обходит обработчики исключений:

Когда вы используете break, ваш внутренний генератор получает GeneratorExit вместо CancelledError. Блок except asyncio.CancelledError никогда не выполняется — он ловит только CancelledError, но не GeneratorExit. Блок finally всё ещё выполняется, но специфичная для отмены логика очистки пропускается.
⚙️ Механика Python:
break→GeneratorExit·task.cancel()→CancelledError
Правильный подход: Уберите проверку события из цикла. Пусть отмена задачи распространяется естественным путём. Когда клиент отключается, фреймворк вызывает task.cancel(), который выбрасывает CancelledError внутри цикла async for. Это исключение проходит через обработчики исключений вашего генератора, гарантируя правильную очистку.
Подробнее см. обсуждение в sse-starlette: Issue #28, PR #30.
"Ложные дисконнекты"
ASGI не даёт гарантий подтверждения доставки. Вы можете отправить данные, но не узнаете, получил ли их клиент. Задачи, которые выглядят успешно завершёнными с точки зрения сервера, всё равно могут не дойти до клиента.
⚙️ Поведение ASGI:
send()возвращается после записи вTCP send buffer, а не после того, как клиент получит данные. Из спецификации: "Protocol servers must flush any data passed to them into the send buffer before returning from a send call."
Это поведение буферизации означает:
✓ Вы знаете, что отправили
✗ Вы не знаете, получил ли это клиент
Но даже когда клиент всё получил:
HTTP chunked encoding требует завершающий чанк. На уровне ASGI это выражается сообщением с more_body: False:
{"type": "http.response.body", "body": b"", "more_body": False}
Но клиенты часто закрывают соединение сразу после получения последнего логического события, не дожидаясь этого чанка. Они получили все нужные данные — зачем ждать?
В результате:
Ваш генератор отдаёт последнее событие
Клиент получает его и сразу закрывает соединение
Сервер получает
TCP FIN→ ASGI создаётhttp.disconnectВаш код получает
CancelledError, скорее всего, уже в блокеfinallyвложенного генератора
Клиент получил всё, что хотел, но с точки зрения сервера произошёл дисконнект.
Практический подход:
Отслеживайте, успели ли вы закончить отправку до получения события отмены. Это даёт лучший статистический сигнал, учитывая, что 100% гарантии невозможны:
async def stream_events(): completed = False try: for event in events: yield {"data": event} # Ставим флаг ДО завершающего фрейма completed = True yield {"event": "done"} except asyncio.CancelledError: if completed: # Отправили всё — скорее всего клиент получил данные logger.info("Stream completed, client likely received all data") else: # Отмена посреди стрима — клиент точно что-то пропустил logger.warning("Stream cancelled mid-transmission") raise # Всегда пробрасываем дальше
Заключение
Всё началось с конкретной проблемы — зависших транзакций и алертов про сломанные подключения к БД. Простое желание понять, что делает request.receive() и почему я должен использовать это в своём коде, привело к разбору TCP-сигналов, ASGI-событий, asyncio-протоколов и семантики Python-генераторов.
Что мы сделали в итоге
Когда разобрались в механике, исправления оказались на удивление простыми.
Для стриминговых эндпоинтов:
Обернули вложенные генераторы в
aclosing()— гарантированное закрытие в нужном async-контекстеДобавили
CancelScope(shield=True)в cleanup-код — защита от прерывания посреди очисткиУбрали все
if disconnected: break— пусть фреймворк сам отменяет, а мы ловимCancelledError
Для обычных эндпоинтов:
Добавили обработку сигналов дисконнекта — теперь долгие операции прерываются, если клиент ушёл
Зависшие транзакции — пропали. Алерты — тоже. Ресурсы используются эффективнее.
Краткий чеклист
Для тех, кто пролистал в конец — вот минимум, который стоит проверить в своём коде:
Слушаете события
http.disconnectв не-стриминговых эндпоинтах?Защищаете критичные операции через
shield?Явно обрабатываете
CancelledError?Cleanup-код в
finallyзащищён от прерывания?Вложенные async-генераторы обёрнуты в
aclosing()?Нет конкурирующих путей отмены (
break+task.cancel())?
Если ответ на всё «да» — скорее всего, у вас всё в порядке.
Нужно ли было копать до уровня TCP? Честно — не уверен. Но когда понимаешь, как всё устроено под капотом, баги перестают быть магией. И следующий раз, когда что-то сломается, уже знаешь, где искать.
Спасибо за внимание — и удачи с обработкой дисконнектов 🙂
Автор
Станислав Шимоволос, бэкенд-инженер в команде LLM-Dev
