В прошлой статье мы разобрали механику событийного цикла asyncio изнутри.
В этот раз поговорим о задачах, объектах класса asyncio.Task (они же по-простому "таски"). Тема важная, потому что по сути вся работа событийного цикла сводится к постоянному жонглированию задачами: запустить, приостановить, разбудить, завершить. Если понять, как устроена таска изнутри, изрядная доля магии asyncio (как и нелюбви к нему) исчезнет.
Но прежде чем нырять в исходники asyncio.Task, нам придется начать издалека, а именно с концепции "фьючеров" и класса concurrent.futures.Future, появившегося в уже далеком 2011 году. asyncio.Task является наследником asyncio.Future, который в свою очередь вдохновлен concurrent.futures.Future. Без понимания этой цепочки картина будет неполной.
В итоге мы разберём:
что такое
Futureи зачем он появился в Pythonчем
asyncio.Futureотличается от своего многопоточного предшественникакак
TaskиспользуетFutureи почему он одновременно является и исполнителем, и контейнером результатакак именно под капотом организовано выполнение корутины, которая обернута в таску
и на десерт шок-контент: почему
awaitдалеко не всегда является точкой, где управление возвращается событийному циклу, чтобы он мог переключиться на другую задачу
Как появился Future
Концепция фьючеров была реализована в Python 3.2 в 2011 году вместе с модулем concurrent.futures (PEP 3148).
Сам по себе объект Future — это "пассивный" контейнер с минимальным API. Он ничего не вычисляет и ни за чем не следит сам. Его задача лишь в том, чтобы хранить статус выполнения любой Python-функции, результат ее вызова, а также при необходимости вызвать колбэки, подписанные на завершение выполнения целевой функции. Управляет статусом фьючера, передает и забирает результаты выполнения целевой функции какой-то внешний по отношению к фьючеру объект, например, пул потоков. Далее этот внешний по отношению к фьючеру объект я буду называть "объект-пользователь".
У Future может быть 4 состояния:
pending ("ожидающий") — такой статус возникает при создании фьючера
cancelled ("отменен") — объект-пользователь (или любой другой внешний объект, у которого есть ссылка на данный фьючер) вызвал у фьючера метод
cancel(). Отмена сработает, если фьючер не успел выйти из состояния pendingrunning ("запущен") — объект-пользователь вызвал у фьючера метод
set_running_or_notify_cancel()finished ("завершен") — объект-пользователь вызвал
set_result(), чтобы положить в фьючер результат выполнения функции, илиset_exception(), чтобы положить в фьючер пойманное исключение. Какую именно функцию выполнять, решает сам объект-пользователь,Futureоб этом ничего не знает.
Переходы между состояниями односторонние. 2 варианта развития состояний:
PENDING -> RUNNING -> FINISHED
PENDING -> CANCELLED
Результат выполнения функции (или исключение, которое оно выбросит) во фьючер помещает объект-пользователь. Чтобы достать этот результат из фьючера, у него есть методы:
result()exception()
from concurrent.futures import Future # Создали фьючер f = Future() print(f.done()) # False, еще не завершен (состояние фьючера PENDING или RUNNING) print(f.running()) # False, еще не выполняется (состояние фьючера точно PENDING) print(f.cancelled()) # False, не был отменен # "Запускаем" фьючер f.set_running_or_notify_cancel() print(f.running()) # True, фьючер перешел в состояние RUNNING # Объект-пользователь кладет во фьючер результат выполнения какой-то функции # Допустим, эта функция вернула число 1000 f.set_result(1000) print(f.done()) # True, фьючер в состоянии FINISHED (т.е. хранит или результат, или исключение) print(f.result()) # 1000, достаем переданный во фьючер на хранение результат выполнения функции
Использование Future в многопоточном коде
Если устройство самого фьючера более-менее понятно, то его реальное применение пока не очень ясно. Одним из таких применений является многопоточный код. Разберём, кто и зачем здесь создаёт фьючеры и кто ими пользуется.
ThreadPoolExecutor — это пул потоков из стандартной библиотеки Python. Его задача в том, чтобы принимать функции на выполнение и распределять их между заранее созданными рабочими потоками. Функции в пул помещаются методом submit(). В ответ submit() возвращает... да-да, тот самый объект Future, "квитанцию", с помощью которой можно следить за ходом выполнения функции и забрать ее результат:
from concurrent.futures import ThreadPoolExecutor def fetch_data(): return 1000 with ThreadPoolExecutor() as executor: future = executor.submit(fetch_data) print(future.result()) # 1000
Но что происходит под капотом submit? Как фьючер связывается с функцией, чьи результаты будет хранить?
При вызове submit под капотом происходит следующее:
создаётся пустой
Futureсоздаётся объект
_WorkItem, который хранит ссылку и на этот фьючер, и на функцию, которая будет выполняться в потоке
def submit(self, fn, /, *args, **kwargs): f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) # уходит в очередь return f # фьючер возвращается вызывающему коду
Именно _WorkItem является тем самым "объектом-пользователем" фьючера — он управляет его состоянием на протяжении всего жизненного цикла. Рабочие потоки пула в бесконечном цикле забирают задачи из этой очереди и вызывают _WorkItem.run():
def run(self): if not self.future.set_running_or_notify_cancel(): return # фьючер уже отменили снаружи, работу не начинаем try: # Запуск функции в поток�� result = self.fn(*self.args, **self.kwargs) except BaseException as exc: # Передача во фьючер исключения self.future.set_exception(exc) else: # Передача во фьючер результата выполнения функции self.future.set_result(result)
Здесь разворачивается вся механика фьючера:
если его успели отменить снаружи, работа просто не начинается
если нет, функция выполняется, и результат или пойманное исключение передается в фьючер.
Таким образом, в многопоточном коде задействованы 3 участника с четкими ролями:
Futureостается пассивным контейнером хранения результата и статусов_WorkItemвыступает объектом-пользователем фьючера, управляет его статусами, запускает функцию в потоке и записывает ее результат в фьючерThreadPoolExecutorиграет роль организатора, создавая парыFuture + _WorkItemи распределяя работу между потоками.
В чем польза Future в многопоточном коде
Может возникнуть вопрос: а зачем вообще нужен Future внешнему пользователю пула потоков? Результат можно было бы передать и без него, положить в переменную, очередь или колбэк.
Дело в том, что фьючер дает сразу несколько удобных вещей в одном объекте.
Параллельный запуск нескольких задач. Можно отправить сразу несколько функций в пул и получать результаты по мере их готовности, а не запускать каждую функцию строго по очереди и ждать завершения каждой:
from concurrent.futures import as_completed # все задачи уже выполняются параллельно в потоках futures = [executor.submit(fetch_data, url) for url in urls] # получаем результаты по мере завершения for f in as_completed(futures): print(f.result())
��ледить за статусом. Без фьючера мы не знаем, задача еще выполняется, уже готова или упала с ошибкой. Фьючер позволяет наблюдать за этим с помощью методов done(), running(), cancelled().
Не потерять исключение. Если функция в потоке упала, без Future исключение останется внутри рабочего потока и не будет доставлено в вызывающий код. Future выступает стандартным каналом доставки как результата, так и ошибки.
Отменить задачу. Если результат больше не нужен, можно вызвать cancel() пока задача еще в очереди.
Подписаться на завершение. Вместо того чтобы периодически проверять done() в цикле, можно зарегистрировать колбэк — функцию, которая автоматически вызовется, когда задача завершится. Для добавления колбэков в API фьючера есть метод add_done_callback.
def on_done(future): print(f"Готово! Результат: {future.result()}") future = executor.submit(fetch_data) # вызовется автоматически по завершении future.add_done_callback(on_done)
Колбэков можно добавить несколько и тогда вызовутся все по очереди. Это удобно, когда нескольким частям программы нужно отреагировать на завершение одной задачи.
Future в asyncio
asyncio.Future появился в Python 3.4 (2014 год) вместе с модулем asyncio и представляет собой адаптацию той же идеи, пассивного контейнера для результата какой-то операции, но для однопоточного асинхронного кода. В докстринге класса asyncio.Future читаем:
"""This class is *almost* compatible with concurrent.futures.Future."""
API asyncio.Future намеренно похож на concurrent.futures.Future:
почти те же состояния (об отличиях ниже)
те же методы
set_result(),set_exception(),cancel(),add_done_callback()
Это сделано для того, чтобы код, работающий с фьючерами, выглядел узнаваемо вне зависимости от того, многопоточный он или асинхронный в одном потоке.
Есть принципиальные отличия, которые продиктованы самой природой asyncio.
Во-первых, нет состояния RUNNING и метода set_running_or_notify_cancel().
Во-вторых, вызов метода result() не блокирует поток. В concurrent.futures.Future вызов result() усыплял поток до готовности результата. В asyncio усыплять единственный поток нельзя. Поэтому если вызвать result() на незавершённом asyncio.Future, он немедленно выбросит исключение InvalidStateError.
import asyncio f = asyncio.get_event_loop().create_future() f.result()
...приведет к
f.result() # InvalidStateError: Result is not ready. ^^^^^^^^^^ asyncio.exceptions.InvalidStateError: Result is not set.
В-третьих, колбэки вызываются через цикл событий. В concurrent.futures.Future колбэки из add_done_callback() вызывались напрямую из потока, завершившего задачу. В asyncio.Future всегда через метод call_soon() событийного цикла. Это гарантирует, что колбэки выполнятся в правильном потоке и в правильный момент итерации цикла.
Список колбэков хранится внутри самого фьючера в атрибуте _сallbacks. Любой желающий может подписаться на завершение фьючера через add_done_callback(), его колбэк просто добавится в этот список:
def add_done_callback(self, fn, *, context=None): if self._state != _PENDING: # Фьючер уже завершен, планируем вызов немедленно self._loop.call_soon(fn, self, context=context) else: # Фьючер еще не завершен — добавляем в список self._callbacks.append((fn, context))
Когда фьючер завершается через set_result(), set_exception() или cancel(), он сам обходит этот список и планирует вызов каждого колбэка через метод call_soon() событийного цикла.
От asyncio.Future к asyncio.Task
Мы выяснили, что asyncio.Future — это пассивный контейнер. Кто-то снаружи должен вызвать у него set_result() или set_exception(), чтобы он перешёл в завершенное состояние. В многопоточном коде этим "кем-то" был объект WorkItem потока, он выполнял функцию и передавал результат в Future. Но в asyncio нет ни потоков, ни WorkItem. Кто тогда будет двигать работу вперёд?
Ответ — Task. Это наследник Future, который перестает быть пассивным. Его задача в том, чтобы обернуть корутину и самостоятельно, шаг за шагом (от одного await к другому внутри корутины), продвигать её выполнение. Когда корутина завершается, Task сам забирает и сохраняет результат. Он одновременно и исполнитель, и контейнер результата.
asyncio.Task под капотом: создание
class Task(futures._PyFuture): def __init__(self, coro, *, loop=None, name=None, context=None, eager_start=False): # инициализируем фьючер super().__init__(loop=loop) # ... # сохраняем корутину self._coro = coro # за каким фьючером сейчас наблюдаем self._fut_waiter = None # ... # Планируем первый шаг self._loop.call_soon(self.__step, context=self._context)
Самое интересное тут вызов self._loop.call_soon. Прямо в конструкторе Task говорит циклу событий: "при первой возможности вызови мой метод __step". Не сейчас, а в следующей итерации цикла. То есть Task при создании не выполняет корутину, он лишь ставит себя в очередь на первый шаг.
Небольшая оговорка: в современном Python (начиная с 3.6) по умолчанию используются C-реализации _asyncio.Task и _asyncio.Future. В этой статье для иллюстрции используются Python-реализации PyTask и PyFuture, которые предусмотрены в исходниках на случай, если C-расширение недоступно. C-реализация делает абсолютно то же самое, но на уровне C-API. Для понимания логики это не имеет никакого значения.
asyncio.Task под капотом: выполнение корутины до ближайшего await
Итак, цикл событий вызвал __step. Посмотрим, что происходит внутри:
# asyncio/tasks.py def __step(self, exc=None): self._fut_waiter = None _enter_task(self._loop, self) try: self.__step_run_and_handle_result(exc) finally: _leave_task(self._loop, self)
Метод __step регистрирует таску как "текущую выполняемую" в цикле событий и передает управление в step_run_and_handle_result, где и происходит вся настоящая работа:
# asyncio/tasks.py def __step_run_and_handle_result(self, exc): coro = self._coro try: if exc is None: # Продвигаем корутину "на один шаг" (до ближайшего await) result = coro.send(None) else: # Или бросаем в неё исключение result = coro.throw(exc) except StopIteration as exc: # Корутина завершилась (больше await нет), извлекаем результат в фьючер super().set_result(exc.value) except CancelledError as exc: super().cancel() # упрощенно; реальная логика отмены сложнее except BaseException as exc: super().set_exception(exc) else: # корутина остановилась на новом await (об этом расскажем отдельно) # ...
Task вызывает coro.send(None), чтобы продвинуть исполнение кода корутины до ближайшего await в ней.
Дальше возможны 2 исхода:
корутина завершилась, т.е. бросила
StopIterationс результатом внутри.Taskперехватывает его и вызываетsuper().set_result(), сохраняя результат (эта возможность есть у таски, как наследника фьючера). Таска перешла в состояниеFINISHED.корутина приостановилась, т.е. встретила новый
awaitи вернула управление обратно в__stepчерезyield. Таска запоминает, чего ждёт, и подписывается на пробуждение.
asyncio.Task под капотом: ожидание и пробуждение
Итак, корутина приостановилась на await и вернула управление в __step. Что происходит дальше?
else: blocking = getattr(result, '_asyncio_future_blocking', None) if blocking: result._asyncio_future_blocking = False result.add_done_callback(self.__wakeup, context=self._context) self._fut_waiter = result # запоминаем, чего ждём
result — это тот Future, который вернула корутина, остановившись на новом await. Таска подписывается на завершение этого фьючера через add_done_callback(self.__wakeup) и сохраняет ссылку в self._fut_waiter. После этого __step завершается и управление возвращается циклу событий, а таска просто ждёт.
Когда фьючер завершается через set_result(), set_exception() или cancel(), он сам обходит этот список и планирует вызов каждого колбэка, вызывая метод call_soon() событийного цикла. Именно на этом механизме держится пробуждение таски: когда таска ждёт какой-то Future, она регистрирует свой __wakeup в его _callbacks. Как только этот фьючер завершится — он сам поставит вызов _wakeup таски в очередь цикла событий через call_soon(). Цикл соб��тий не отслеживает связи между тасками и фьючерами, он лишь вызывает то, что уже поставлено в его очередь.
# asyncio/tasks.py def __wakeup(self, future): try: future.result() except BaseException as exc: # Будим таску с исключением self.__step(exc) else: # Будим таску без исключения self.__step()
__wakeup проверяет результат завершившегося Future и снова вызывает __step с исключением или без. Таска просыпается и продвигает корутину дальше до следующего await или до завершения.
Кстати, на счет __wakeup. Если когда-нибудь при отладке asyncio-приложения вы использовали метод print_stack задачи, то наверняка в описании статуса задачи видели такое
wait_for=<Future pending cb=[Task.task_wakeup()]>
Это буквально то, о чем мы только что говорили:
таска ждет конкретный фьючер (wait_for=...)
в списке колбэков этого фьючера уже зарегистрирован
__wakeup(cb=[Task.task_wakeup()]).
Как только этот Future завершится, он вызовет task_wakeup() (C-имя для __wakeup), и таска проснется.
Как в итоге устроено выполнение asyncio.Task
Когда мы пишем asyncio.create_task(coro), происходит следующее:
создается объект
Task("таска")функция-корутина сохраняется внутри него
прямо в конструкторе таска говорит циклу событий: "при первой возможности вызови мой метод
__step".
На этом создание заканчивается, корутина еще не выполнила ни одной строки своего кода.
В следующей итерации цикл событий вызывает __step, и начинается первый шаг: таска вызывает coro.send(None), что буквально означает "запусти корутину и выполняй ее до первого await". Корутина начинает работать, выполняет код, вычисляет значения, и в какой-то момент встречает await some_future.
Здесь корутина приостанавливается и возвращает управление обратно в __step, передав ему тот Future, которого она ждет. Таска смотрит на этот Future и говорит ему: "когда завершишься, передай мой wakeup в работу событийному циклу". После этого __step завершается, и управление возвращается событийному циклу. Таска молчит и ждет.
Тем временем событийный цикл занимается своими делами: обрабатывает другие таски, следит за таймерами, получает уведомления о событиях на сокетах. В какой-то момент ожидаемый Future завершается и закидывает в событийный цикл задачу на wakeup таски. Таска забирает результат завершившегося Future и снова вызывает __step, корутина просыпается и продолжает выполнение с того места, где остановилась.
Этот танец повторяется снова и снова:
__stepпродвигает корутину до следующегоawaitтаска засыпает
просыпается через
__wakeupновый
__step.
Все продолжается до тех пор, пока корутина не дойдет до конца и не бросит StopIteration со своим возвращаемым значением внутри. Тогда таска перехватывает его и вызывает super().set_result(), записывает результат в себя как в Future, переходит в состояние FINISHED, и все, кто подписался на ее завершение, получают уведомление.
Task — это не магия. Это просто цикл вызовов send() на корутине, организованный через колбэки событийного цикла.
Шок-контент: не всякий await отдает управление циклу событий
Когда читаешь про asyncio, складывается впечатление, что любой await в Python-коде — это точка, в которой цикл событий получает управление и может заняться другими тасками. Это большое заблуждение!
Передача управления событийному циклу происходит только при соблюдении 2 условий:
происходит
awaitфьючера (и таски как наследницы фьючера)фьючер еще не готов (если фьючер уже готов,
yieldне происходит, и выполнение продолжается немедленно без какого-либо переключения).
await чистой корутины (т.е. async def-функции, не обернутой в таску) НЕ отдает управление событийному циклу.
Почему такая разница? Чтобы понять это, нужно вспомнить, как работает __step: он вызывает coro.send(None) и ждёт, пока корутина не вернёт управление через инструкцию yield. Именно yield является точкой выхода из корутины обратно в step, и дальше в цикл событий.
Так вот, yield в цепочке await производит только один объект — asyncio.Future в своём методе await:
# asyncio/futures.py def __await__(self): if not self.done(): self._asyncio_future_blocking = True yield self # Вот она, точка переключения! return self.result()
Когда мы пишем await asyncio.sleep(1), внутри sleep создается настоящий Future, и в итоге происходит yield self. Этот yield пробивается сквозь всю цепочку await наверх до __step таски и вот тут цикл событий наконец получает управление.
А когда мы пишем await my_coroutine(), то просто входим в другую корутину и начинаем ее исполнение, не отдавая управление событийному циклу. Если внутри нее нет ни одного await future (авайта именно фьючера, а не чистой корутины), то никакого yield не произойдет. __step не получит управление обратно, пока вся корутина не выполнится до конца. Цикл событий в это время не может обработать ни одну другую таску.
Управление возвращается циклу событий только тогда, когда где-то в глубине цепочки await происходит настоящий yield из Future.__await__.
Заключение
Мы прошли долгий путь: от concurrent.futures.Future, появившегося в 2011 году, через asyncio.Future до внутреннего устройства asyncio.Task.
Task является наследником Future, который взял на себя роль WorkItem из многопоточного мира. Только вместо того чтобы выполнять обычную функцию в потоке, объект задачи шаг за шагом продвигает выполнение кода корутины через цикл событий, используя coro.send(), колбэки и _wakeup.
И самое важное, что стоит унести из этой статьи: await — это не волшебный переключатель. Управление возвращается циклу событий только тогда, когда где-то в глубине цепочки вызовов происходит yield из Future.__await__ (иными словами, авайтится фьючер-подобный объект, например, таска). Авайты чистых корутин просто запускают их последовательное выполнение, не отдавая управление событийному циклу до тех пор, пока внутри их кода не произойдет авайт фьючера/таски (с уточнением: этот фьючер/таска еще не готова отдать результат). Авайт уже готовых фьючеров не отдает управление событийному циклу (yield в этом случае просто не происходит).
Понимание этого механизма меняет то, как читаешь асинхронный код: становится видно, где реальные точки переключения, почему конкретный участок кода блокирует весь событийный цикл и как правильно строить асинхронные программы.
