В прошлой статье мы разобрали механику событийного цикла asyncio изнутри.

В этот раз поговорим о задачах, объектах класса asyncio.Task (они же по-простому "таски"). Тема важная, потому что по сути вся работа событийного цикла сводится к постоянному жонглированию задачами: запустить, приостановить, разбудить, завершить. Если понять, как устроена таска изнутри, изрядная доля магии asyncio (как и нелюбви к нему) исчезнет.

Но прежде чем нырять в исходники asyncio.Task, нам придется начать издалека, а именно с концепции "фьючеров" и класса concurrent.futures.Future, появившегося в уже далеком 2011 году. asyncio.Task является наследником asyncio.Future, который в свою очередь вдохновлен concurrent.futures.Future. Без понимания этой цепочки картина будет неполной.

В итоге мы разберём:

  • что такое Future и зачем он появился в Python

  • чем asyncio.Future отличается от своего многопоточного предшественника

  • как Task использует Future и почему он одновременно является и исполнителем, и контейнером результата

  • как именно под капотом организовано выполнение корутины, которая обернута в таску

  • и на десерт шок-контент: почему await далеко не всегда является точкой, где управление возвращается событийному циклу, чтобы он мог переключиться на другую задачу

Как появился Future

Концепция фьючеров была реализована в Python 3.2 в 2011 году вместе с модулем concurrent.futures (PEP 3148).

Сам по себе объект Future — это "пассивный" контейнер с минимальным API. Он ничего не вычисляет и ни за чем не следит сам. Его задача лишь в том, чтобы хранить статус выполнения любой Python-функции, результат ее вызова, а также при необходимости вызвать колбэки, подписанные на завершение выполнения целевой функции. Управляет статусом фьючера, передает и забирает результаты выполнения целевой функции какой-то внешний по отношению к фьючеру объект, например, пул потоков. Далее этот внешний по отношению к фьючеру объект я буду называть "объект-пользователь".

У Future может быть 4 состояния:

  • pending ("ожидающий") — такой статус возникает при создании фьючера

  • cancelled ("отменен") — объект-пользователь (или любой другой внешний объект, у которого есть ссылка на данный фьючер) вызвал у фьючера метод cancel(). Отмена сработает, если фьючер не успел выйти из состояния pending

  • running ("запущен") — объект-пользователь вызвал у фьючера метод set_running_or_notify_cancel()

  • finished ("завершен") — объект-пользователь вызвал set_result(), чтобы положить в фьючер результат выполнения функции, или set_exception(), чтобы положить в фьючер пойманное исключение. Какую именно функцию выполнять, решает сам объект-пользователь, Future об этом ничего не знает.

Переходы между состояниями односторонние. 2 варианта развития состояний:

  • PENDING -> RUNNING -> FINISHED

  • PENDING -> CANCELLED

Результат выполнения функции (или исключение, которое оно выбросит) во фьючер помещает объект-пользователь. Чтобы достать этот результат из фьючера, у него есть методы:

  • result()

  • exception()

from concurrent.futures import Future

# Создали фьючер
f = Future()

print(f.done()) # False, еще не завершен (состояние фьючера PENDING или RUNNING)
print(f.running()) # False, еще не выполняется (состояние фьючера точно PENDING)
print(f.cancelled()) # False, не был отменен

# "Запускаем" фьючер
f.set_running_or_notify_cancel()
print(f.running()) # True, фьючер перешел в состояние RUNNING

# Объект-пользователь кладет во фьючер результат выполнения какой-то функции
# Допустим, эта функция вернула число 1000
f.set_result(1000)

print(f.done()) # True, фьючер в состоянии FINISHED (т.е. хранит или результат, или исключение)
print(f.result()) # 1000, достаем переданный во фьючер на хранение результат выполнения функции

Использование Future в многопоточном коде

Если устройство самого фьючера более-менее понятно, то его реальное применение пока не очень ясно. Одним из таких применений является многопоточный код. Разберём, кто и зачем здесь создаёт фьючеры и кто ими пользуется.

ThreadPoolExecutor — это пул потоков из стандартной библиотеки Python. Его задача в том, чтобы принимать функции на выполнение и распределять их между заранее созданными рабочими потоками. Функции в пул помещаются методом submit(). В ответ submit() возвращает... да-да, тот самый объект Future, "квитанцию", с помощью которой можно следить за ходом выполнения функции и забрать ее результат:

from concurrent.futures import ThreadPoolExecutor

def fetch_data():
    return 1000

with ThreadPoolExecutor() as executor:
    future = executor.submit(fetch_data)
    print(future.result()) # 1000

Но что происходит под капотом submit? Как фьючер связывается с функцией, чьи результаты будет хранить?

При вызове submit под капотом происходит следующее:

  • создаётся пустой Future

  • создаётся объект _WorkItem, который хранит ссылку и на этот фьючер, и на функцию, которая будет выполняться в потоке

def submit(self, fn, /, *args, **kwargs):
    f = _base.Future()
    w = _WorkItem(f, fn, args, kwargs)
    self._work_queue.put(w) # уходит в очередь
    return f # фьючер возвращается вызывающему коду

Именно _WorkItem является тем самым "объектом-пользователем" фьючера — он управляет его состоянием на протяжении всего жизненного цикла. Рабочие потоки пула в бесконечном цикле забирают задачи из этой очереди и вызывают _WorkItem.run():

def run(self):
    if not self.future.set_running_or_notify_cancel():
        return # фьючер уже отменили снаружи, работу не начинаем

    try:
        # Запуск функции в поток��
        result = self.fn(*self.args, **self.kwargs)
    except BaseException as exc:
        # Передача во фьючер исключения
        self.future.set_exception(exc)
    else:
        # Передача во фьючер результата выполнения функции
        self.future.set_result(result)

Здесь разворачивается вся механика фьючера:

  • если его успели отменить снаружи, работа просто не начинается

  • если нет, функция выполняется, и результат или пойманное исключение передается в фьючер.

Таким образом, в многопоточном коде задействованы 3 участника с четкими ролями:

  • Future остается пассивным контейнером хранения результата и статусов

  • _WorkItem выступает объектом-пользователем фьючера, управляет его статусами, запускает функцию в потоке и записывает ее результат в фьючер

  • ThreadPoolExecutor играет роль организатора, создавая пары Future + _WorkItem и распределяя работу между потоками.

В чем польза Future в многопоточном коде

Может возникнуть вопрос: а зачем вообще нужен Future внешнему пользователю пула потоков? Результат можно было бы передать и без него, положить в переменную, очередь или колбэк.

Дело в том, что фьючер дает сразу несколько удобных вещей в одном объекте.

Параллельный запуск нескольких задач. Можно отправить сразу несколько функций в пул и получать результаты по мере их готовности, а не запускать каждую функцию строго по очереди и ждать завершения каждой:

from concurrent.futures import as_completed

# все задачи уже выполняются параллельно в потоках
futures = [executor.submit(fetch_data, url) for url in urls]

# получаем результаты по мере завершения
for f in as_completed(futures):
    print(f.result())

��ледить за статусом. Без фьючера мы не знаем, задача еще выполняется, уже готова или упала с ошибкой. Фьючер позволяет наблюдать за этим с помощью методов done(), running(), cancelled().

Не потерять исключение. Если функция в потоке упала, без Future исключение останется внутри рабочего потока и не будет доставлено в вызывающий код. Future выступает стандартным каналом доставки как результата, так и ошибки.

Отменить задачу. Если результат больше не нужен, можно вызвать cancel() пока задача еще в очереди.

Подписаться на завершение. Вместо того чтобы периодически проверять done() в цикле, можно зарегистрировать колбэк — функцию, которая автоматически вызовется, когда задача завершится. Для добавления колбэков в API фьючера есть метод add_done_callback.

def on_done(future):
    print(f"Готово! Результат: {future.result()}")

future = executor.submit(fetch_data)
# вызовется автоматически по завершении
future.add_done_callback(on_done)

Колбэков можно добавить несколько и тогда вызовутся все по очереди. Это удобно, когда нескольким частям программы нужно отреагировать на завершение одной задачи.

Future в asyncio

asyncio.Future появился в Python 3.4 (2014 год) вместе с модулем asyncio и представляет собой адаптацию той же идеи, пассивного контейнера для результата какой-то операции, но для однопоточного асинхронного кода. В докстринге класса asyncio.Future читаем:

"""This class is *almost* compatible with concurrent.futures.Future."""

API asyncio.Future намеренно похож на concurrent.futures.Future:

  • почти те же состояния (об отличиях ниже)

  • те же методы set_result(), set_exception(), cancel(), add_done_callback()

Это сделано для того, чтобы код, работающий с фьючерами, выглядел узнаваемо вне зависимости от того, многопоточный он или асинхронный в одном потоке.

Есть принципиальные отличия, которые продиктованы самой природой asyncio.

Во-первых, нет состояния RUNNING и метода set_running_or_notify_cancel().

Во-вторых, вызов метода result() не блокирует поток. В concurrent.futures.Future вызов result() усыплял поток до готовности результата. В asyncio усыплять единственный поток нельзя. Поэтому если вызвать result() на незавершённом asyncio.Future, он немедленно выбросит исключение InvalidStateError.

import asyncio

f = asyncio.get_event_loop().create_future()
f.result()

...приведет к

    f.result()  # InvalidStateError: Result is not ready.
    ^^^^^^^^^^
asyncio.exceptions.InvalidStateError: Result is not set.

В-третьих, колбэки вызываются через цикл событий. В concurrent.futures.Future колбэки из add_done_callback() вызывались напрямую из потока, завершившего задачу. В asyncio.Future всегда через метод call_soon() событийного цикла. Это гарантирует, что колбэки выполнятся в правильном потоке и в правильный момент итерации цикла.

Список колбэков хранится внутри самого фьючера в атрибуте _сallbacks. Любой желающий может подписаться на завершение фьючера через add_done_callback(), его колбэк просто добавится в этот список:

def add_done_callback(self, fn, *, context=None):
    if self._state != _PENDING:
        # Фьючер уже завершен, планируем вызов немедленно
        self._loop.call_soon(fn, self, context=context)
    else:
        # Фьючер еще не завершен — добавляем в список
        self._callbacks.append((fn, context))

Когда фьючер завершается через set_result(), set_exception() или cancel(), он сам обходит этот список и планирует вызов каждого колбэка через метод call_soon() событийного цикла.

От asyncio.Future к asyncio.Task

Мы выяснили, что asyncio.Future — это пассивный контейнер. Кто-то снаружи должен вызвать у него set_result() или set_exception(), чтобы он перешёл в завершенное состояние. В многопоточном коде этим "кем-то" был объект WorkItem потока, он выполнял функцию и передавал результат в Future. Но в asyncio нет ни потоков, ни WorkItem. Кто тогда будет двигать работу вперёд?

Ответ — Task. Это наследник Future, который перестает быть пассивным. Его задача в том, чтобы обернуть корутину и самостоятельно, шаг за шагом (от одного await к другому внутри корутины), продвигать её выполнение. Когда корутина завершается, Task сам забирает и сохраняет результат. Он одновременно и исполнитель, и контейнер результата.

asyncio.Task под капотом: создание

class Task(futures._PyFuture):
    def __init__(self, coro, *, loop=None, name=None, context=None, eager_start=False):
        # инициализируем фьючер
        super().__init__(loop=loop)
        # ...
        # сохраняем корутину
        self._coro = coro
        # за каким фьючером сейчас наблюдаем
        self._fut_waiter = None
        # ...
        # Планируем первый шаг
        self._loop.call_soon(self.__step, context=self._context)

Самое интересное тут вызов self._loop.call_soon. Прямо в конструкторе Task говорит циклу событий: "при первой возможности вызови мой метод __step". Не сейчас, а в следующей итерации цикла. То есть Task при создании не выполняет корутину, он лишь ставит себя в очередь на первый шаг.

Небольшая оговорка: в современном Python (начиная с 3.6) по умолчанию используются C-реализации _asyncio.Task и _asyncio.Future. В этой статье для иллюстрции используются Python-реализации PyTask и PyFuture, которые предусмотрены в исходниках на случай, если C-расширение недоступно. C-реализация делает абсолютно то же самое, но на уровне C-API. Для понимания логики это не имеет никакого значения.

asyncio.Task под капотом: выполнение корутины до ближайшего await

Итак, цикл событий вызвал __step. Посмотрим, что происходит внутри:

# asyncio/tasks.py

def __step(self, exc=None):
    self._fut_waiter = None
    _enter_task(self._loop, self)
    try:
        self.__step_run_and_handle_result(exc)
    finally:
        _leave_task(self._loop, self)

Метод __step регистрирует таску как "текущую выполняемую" в цикле событий и передает управление в step_run_and_handle_result, где и происходит вся настоящая работа:

# asyncio/tasks.py

def __step_run_and_handle_result(self, exc):
    coro = self._coro
    try:
        if exc is None:
            # Продвигаем корутину "на один шаг" (до ближайшего await)
            result = coro.send(None)
        else:
            # Или бросаем в неё исключение
            result = coro.throw(exc)
    except StopIteration as exc:
        # Корутина завершилась (больше await нет), извлекаем результат в фьючер
        super().set_result(exc.value)
    except CancelledError as exc:
        super().cancel() # упрощенно; реальная логика отмены сложнее
    except BaseException as exc:
        super().set_exception(exc)
    else:
        # корутина остановилась на новом await (об этом расскажем отдельно)
        # ...

Task вызывает coro.send(None), чтобы продвинуть исполнение кода корутины до ближайшего await в ней.

Дальше возможны 2 исхода:

  • корутина завершилась, т.е. бросила StopIteration с результатом внутри. Task перехватывает его и вызывает super().set_result(), сохраняя результат (эта возможность есть у таски, как наследника фьючера). Таска перешла в состояние FINISHED.

  • корутина приостановилась, т.е. встретила новый await и вернула управление обратно в __step через yield. Таска запоминает, чего ждёт, и подписывается на пробуждение.

asyncio.Task под капотом: ожидание и пробуждение

Итак, корутина приостановилась на await и вернула управление в __step. Что происходит дальше?

else:
    blocking = getattr(result, '_asyncio_future_blocking', None)
    if blocking:
        result._asyncio_future_blocking = False
        result.add_done_callback(self.__wakeup, context=self._context)
        self._fut_waiter = result # запоминаем, чего ждём

result — это тот Future, который вернула корутина, остановившись на новом await. Таска подписывается на завершение этого фьючера через add_done_callback(self.__wakeup) и сохраняет ссылку в self._fut_waiter. После этого __step завершается и управление возвращается циклу событий, а таска просто ждёт.

Когда фьючер завершается через set_result(), set_exception() или cancel(), он сам обходит этот список и планирует вызов каждого колбэка, вызывая метод call_soon() событийного цикла. Именно на этом механизме держится пробуждение таски: когда таска ждёт какой-то Future, она регистрирует свой __wakeup в его _callbacks. Как только этот фьючер завершится — он сам поставит вызов _wakeup таски в очередь цикла событий через call_soon(). Цикл соб��тий не отслеживает связи между тасками и фьючерами, он лишь вызывает то, что уже поставлено в его очередь.

# asyncio/tasks.py

def __wakeup(self, future):
    try:
        future.result()
    except BaseException as exc:
        # Будим таску с исключением
        self.__step(exc)
    else:
        # Будим таску без исключения
        self.__step()

__wakeup проверяет результат завершившегося Future и снова вызывает __step с исключением или без. Таска просыпается и продвигает корутину дальше до следующего await или до завершения.

Кстати, на счет __wakeup. Если когда-нибудь при отладке asyncio-приложения вы использовали метод print_stack задачи, то наверняка в описании статуса задачи видели такое

wait_for=<Future pending cb=[Task.task_wakeup()]>

Это буквально то, о чем мы только что говорили:

  • таска ждет конкретный фьючер (wait_for=...)

  • в списке колбэков этого фьючера уже зарегистрирован __wakeup (cb=[Task.task_wakeup()]).

Как только этот Future завершится, он вызовет task_wakeup() (C-имя для __wakeup), и таска проснется.

Как в итоге устроено выполнение asyncio.Task

Когда мы пишем asyncio.create_task(coro), происходит следующее:

  • создается объект Task ("таска")

  • функция-корутина сохраняется внутри него

  • прямо в конструкторе таска говорит циклу событий: "при первой возможности вызови мой метод __step".

На этом создание заканчивается, корутина еще не выполнила ни одной строки своего кода.

В следующей итерации цикл событий вызывает __step, и начинается первый шаг: таска вызывает coro.send(None), что буквально означает "запусти корутину и выполняй ее до первого await". Корутина начинает работать, выполняет код, вычисляет значения, и в какой-то момент встречает await some_future.

Здесь корутина приостанавливается и возвращает управление обратно в __step, передав ему тот Future, которого она ждет. Таска смотрит на этот Future и говорит ему: "когда завершишься, передай мой wakeup в работу событийному циклу". После этого __step завершается, и управление возвращается событийному циклу. Таска молчит и ждет.

Тем временем событийный цикл занимается своими делами: обрабатывает другие таски, следит за таймерами, получает уведомления о событиях на сокетах. В какой-то момент ожидаемый Future завершается и закидывает в событийный цикл задачу на wakeup таски. Таска забирает результат завершившегося Future и снова вызывает __step, корутина просыпается и продолжает выполнение с того места, где остановилась.

Этот танец повторяется снова и снова:

  • __step продвигает корутину до следующего await

  • таска засыпает

  • просыпается через __wakeup

  • новый __step.

Все продолжается до тех пор, пока корутина не дойдет до конца и не бросит StopIteration со своим возвращаемым значением внутри. Тогда таска перехватывает его и вызывает super().set_result(), записывает результат в себя как в Future, переходит в состояние FINISHED, и все, кто подписался на ее завершение, получают уведомление.

Task — это не магия. Это просто цикл вызовов send() на корутине, организованный через колбэки событийного цикла.

Шок-контент: не всякий await отдает управление циклу событий

Когда читаешь про asyncio, складывается впечатление, что любой await в Python-коде — это точка, в которой цикл событий получает управление и может заняться другими тасками. Это большое заблуждение!

Передача управления событийному циклу происходит только при соблюдении 2 условий:

  • происходит await фьючера (и таски как наследницы фьючера)

  • фьючер еще не готов (если фьючер уже готов, yield не происходит, и выполнение продолжается немедленно без какого-либо переключения).

await чистой корутины (т.е. async def-функции, не обернутой в таску) НЕ отдает управление событийному циклу.

Почему такая разница? Чтобы понять это, нужно вспомнить, как работает __step: он вызывает coro.send(None) и ждёт, пока корутина не вернёт управление через инструкцию yield. Именно yield является точкой выхода из корутины обратно в step, и дальше в цикл событий.

Так вот, yield в цепочке await производит только один объект — asyncio.Future в своём методе await:

# asyncio/futures.py

def __await__(self):
    if not self.done():
        self._asyncio_future_blocking = True
        yield self  # Вот она, точка переключения!
    return self.result()

Когда мы пишем await asyncio.sleep(1), внутри sleep создается настоящий Future, и в итоге происходит yield self. Этот yield пробивается сквозь всю цепочку await наверх до __step таски и вот тут цикл событий наконец получает управление.

А когда мы пишем await my_coroutine(), то просто входим в другую корутину и начинаем ее исполнение, не отдавая управление событийному циклу. Если внутри нее нет ни одного await future (авайта именно фьючера, а не чистой корутины), то никакого yield не произойдет. __step не получит управление обратно, пока вся корутина не выполнится до конца. Цикл событий в это время не может обработать ни одну другую таску.

Управление возвращается циклу событий только тогда, когда где-то в глубине цепочки await происходит настоящий yield из Future.__await__.

Заключение

Мы прошли долгий путь: от concurrent.futures.Future, появившегося в 2011 году, через asyncio.Future до внутреннего устройства asyncio.Task.

Task является наследником Future, который взял на себя роль WorkItem из многопоточного мира. Только вместо того чтобы выполнять обычную функцию в потоке, объект задачи шаг за шагом продвигает выполнение кода корутины через цикл событий, используя coro.send(), колбэки и _wakeup.

И самое важное, что стоит унести из этой статьи: await — это не волшебный переключатель. Управление возвращается циклу событий только тогда, когда где-то в глубине цепочки вызовов происходит yield из Future.__await__ (иными словами, авайтится фьючер-подобный объект, например, таска). Авайты чистых корутин просто запускают их последовательное выполнение, не отдавая управление событийному циклу до тех пор, пока внутри их кода не произойдет авайт фьючера/таски (с уточнением: этот фьючер/таска еще не готова отдать результат). Авайт уже готовых фьючеров не отдает управление событийному циклу (yield в этом случае просто не происходит).

Понимание этого механизма меняет то, как читаешь асинхронный код: становится видно, где реальные точки переключения, почему конкретный участок кода блокирует весь событийный цикл и как правильно строить асинхронные программы.