Как стать автором
Обновить
73.11
Wunder Fund
Мы занимаемся высокочастотной торговлей на бирже

Полное руководство по модулю asyncio в Python. Часть 4

Время на прочтение12 мин
Количество просмотров17K
Автор оригинала: Jason Brownlee

Сегодня, в четвёртой части (перваявтораятретья) перевода учебного руководства по модулю asyncio в Python, представляем вашему вниманию разделы оригинала №8 и 9.

8. Работа с задачами и получение сведений о них

Задачи — это основная валюта asyncio-программ.

В этом разделе мы поближе познакомимся с тем, как взаимодействовать с ними в наших проектах.

8.1. Жизненный цикл задачи

С asyncio-объектом Task связано такое понятие, как жизненный цикл.

Для начала отметим, что задачи создают из корутин. Затем планируют их независимое выполнение в цикле событий. В результате, в некий момент времени, запланированная задача выполняется. В процессе её выполнения она может быть приостановлена, например — при ожидании выполнения другой корутины или задачи. Её работа может завершиться нормально и возвратить результат. Возможно и такое, что её работа завершится неудачно, с выдачей исключения.

Какая-нибудь корутина может вмешаться в работу задачи и отменить её выполнение.

В любом случае — задача будет завершена и её нельзя будет запустить снова.

Жизненный цикл задачи можно представить себе так:

  1. Создана (Created).

  2. Запланирована (Scheduled).

  3. Выполняется (Running).

  4. Завершена (Done).

Обратите внимание на то, что такие пункты нашего списка, как «Приостановлена», «Вернула результат», «Выдала исключение» и «Отменена» — это, сами по себе, не состояния задачи. Это — важные переходные моменты жизненного цикла выполняющейся задачи.

Следующая схема иллюстрирует жизненный цикл задачи, показывая переходы между его фазами.

Жизненный цикл asyncio-объекта Task
Жизненный цикл asyncio-объекта Task

Подробности о жизненном цикле asyncio-задач можно найти в этом руководстве.

Теперь, когда мы, в общих чертах, ознакомились с жизненным циклом задачи — приглядимся к каждой из его фаз.

8.2. Как проверить состояние задачи

После того, как объект Task создан — можно проверить состояние задачи. В состоянии задачи есть два интересующих нас значения, дающих следующие сведения:

  • Завершено ли выполнение задачи.

  • Отменено ли выполнение задачи.

Разберёмся с каждым из них.

Проверка на завершение выполнения задачи

Узнать о том, завершено ли выполнение задачи, можно, воспользовавшись её методом done().

Этот метод возвращает True в том случае, если выполнение задачи завершено. В противном случае он возвращает False:

...
проверка на завершение выполнения задачи
if task.done():
    # ...

Выполнение задачи будет завершено в том случае, если у неё была возможность выполниться, после чего она больше не выполняется.

Задача, выполнение которой лишь запланировано, завершённой не считается.

Выполнение задачи будет завершено в следующих случаях:

  • Выполнение корутины завершилось нормально.

  • Корутина явным образом вернула результат.

  • Корутина выдала неожиданную ошибку или неожиданное исключение.

  • Выполнение задачи отменено.

Проверка на отмену выполнения задачи

Проверить то, отменено ли выполнение задачи, можно с помощью её метода cancelled().

Этот метод возвращает True в том случае, если выполнение задачи было отменено, и False — в других случаях:

...
проверка на отмену выполнения задачи
if task.cancelled():
    # ...

Задача будет отменена в том случае, если был вызван её метод cancel(), и если он был успешно завершён. То есть — если этот метод вернул True.

Задача не считается отменённой в том случае, если не был вызван метод cancel(), или если этот метод был вызван, но не смог отменить задачу.

8.3. Как получить результат выполнения задачи

Получить результат выполнения задачи можно с помощью метода result().

Этот метод выдаёт возвращаемое значение корутины, обёрнутой объектом Task, или None — если корутина не возвращает значение явным образом:

...
получение возвращённого значения из обёрнутой корутины
value = task.result()

Если корутина выдаёт необработанную ошибку — она выдаётся снова при вызове result() и её может понадобиться обработать. То же касается и исключений:

...
try:
    # получение возвращённого значения из обёрнутой корутины
    value = task.result()
except Exception:
    # задача завершилась неудачно, результата нет

Если задача была отменена — при вызове метода result() выдаётся исключение CancelledError. Это исключение может нуждаться в обработке:

...
try:
    # получение возвращённого значения из обёрнутой корутины
    value = task.result()
except asyncio.CancelledError:
    # задача была отменена

В результате перед получением значения задачи есть смысл сначала проверять то, не была ли она отменена:

...
проверка того, не была ли отменена задача
if not task.cancelled():
    # получение возвращённого значения из обёрнутой корутины
    value = task.result()
else:
    # задача была отменена

Если задача ещё не завершила выполнение — при вызове метода result() выдаётся исключение InvalidStateError. Оно тоже может нуждаться в обработке:

...
try:
    # получение возвращённого значения из обёрнутой корутины
    value = task.result()
except asyncio.InvalidStateError:
    # выполнение задачи ещё не завершено

В результате перед получением из задачи значения неплохо будет сначала проверить — завершено ли её выполнение:

...
проверка того, завершено ли выполнение задачи
if not task.done():
    await task
получение возвращённого значения из обёрнутой корутины
value = task.result()

8.4. Как получить исключение, выданное задачей

Корутина, обёрнутая задачей, может вызвать исключение, которое окажется необработанным.

Это приведёт к отмене задачи.

Необработанное исключение можно получить, воспользовавшись методом задачи exception():

...
получение исключения, вызванного задачей
exception = task.exception()

Если необработанное исключение не было вызвано в обёрнутой корутине — тогда этот метод возвратит значение None.

Если задача была отменена — при вызове метода exception() будет выдано исключение CancelledError. Его может понадобиться обработать:

...
try:
    # получение исключения, вызванного задачей
    exception = task.exception()
except asyncio.CancelledError:
    # задача была отменена

В результате — неплохо будет сначала проверить, не была ли отменена задача:

...
проверка того, не была ли отменена задача
if not task.cancelled():
    # получение исключения, вызванного задачей
    exception = task.exception()
else:
    # задача была отменена

Если выполнение задачи ещё не завершилось — при вызове метода exception() будет выдано исключение InvalidStateError. Оно может нуждаться в обработке:

...
try:
    # получение исключения, вызванного задачей
    exception = task.exception()
except asyncio.InvalidStateError:
    # выполнение задачи ещё не завершено

В результате — в таких ситуациях рекомендуется сначала проверить, было ли завершено выполнение задачи:

...
проверка того, не было ли завершено выполнение задачи
if not task.done():
    await task
получение исключения, выданного задачей
exception = task.exception()

8.5. Как отменить выполнение задачи

Отменить выполнение запланированной задачи можно, воспользовавшись её методом cancel().

Он вернёт True в том случае, если ему удалось отменить задачу, а в противном случае он вернёт False:

...
отмена задачи
was_cancelled = task.cancel()

Если выполнение задачи уже завершено — её нельзя отменить. В такой ситуации метод cancel() вернёт False, а задаче не будет назначено состояние, указывающее на то, что она была отменена.

В следующий раз, когда у задачи будет возможность выполниться, она вызовет исключение CancelledError.

Если исключение CancelledError не будет обработано в обёрнутой задачей корутине — выполнение задачи будет отменено.

В противном случае, если это исключение будет обработано в корутине — задача отменена не будет.

Метод cancel() может принимать аргумент, содержащий сообщение, которое будет использовано в качестве содержимого исключения CancelledError.

8.6. Как использовать коллбэки при работе с задачами

Задачу можно оснастить коллбэком, вызываемым при её завершении, воспользовавшись методом add_done_callback().

Этот метод принимает имя функции, которую нужно вызвать после завершения выполнения задачи.

Функция-коллбэк должна принимать в качестве аргумента экземпляр класса Task:

# функция-коллбэк, вызываемая после завершения работы задачи
def handle(task):
    print(task)
 
...
регистрация коллбэка
task.add_done_callback(handle)

Вспомните о том, что задача может быть завершена в том случае, если обёрнутая ей корутина нормально завершила работу и вернула значение, когда произошло необработанное исключение, или когда задача была отменена.

Метод add_done_callback() может использоваться для добавления к задаче или регистрации любого необходимого количества функций-коллбэков.

Удалять или отменять регистрацию коллбэков можно, воспользовавшись методом remove_done_callback():

...
удаление коллбэка, вызываемого при завершении задачи
task.remove_done_callback(handle)

8.7. Как назначить задаче имя

У задач могут быть имена.

Эти имена могут пригодиться в том случае, если на основе одной и той же корутины создаются несколько задач и при этом нужен какой-то способ программно различать эти задачи.

Имя можно задать при создании задачи из корутины, воспользовавшись аргументом name:

...
создание задачи из корутины
task = asyncio.create_task(task_coroutine(), name='MyTask')

Имя задаче можно назначить и с помощью метода set_name():

...
назначение задаче имени
task.set_name('MyTask')

Узнать имя задачи можно, воспользовавшись методом get_name():

...
получение имени задачи
name = task.get_name()

Подробности о проверке состояния задач можно узнать из этого руководства.

9. Текущие и выполняющиеся задачи

Можно узнавать о том, какие задачи выполняются в цикле событий.

Сделать это можно, получив объект asyncio.Task для текущей выполняемой задачи и для всех остальных выполняющихся задач.

9.1. Как получить объект текущей задачи

Текущую выполняемую задачу можно получить посредством метода asyncio.current_task().

Эта функция возвращает объект Task для задачи, которая выполняется в текущий момент:

...
получение текущей задачи
task = asyncio.current_task()

Данная функция возвращает объект текущей задачи.

Вот что это может быть:

  • Главная корутина, переданная asyncio.run().

  • Задача, которая была создана и выполнение которой было запланировано в asyncio-программе посредством asyncio.create_task().

Задача может создавать и запускать другие корутины (например — не обёрнутые в объекты задач). Получение текущей задачи из корутины приведёт к возврату объекта Task для работающей задачи, но не корутины, которая выполняется в данный момент.

Получение текущей задачи может пригодиться в том случае, если корутине или задаче нужны подробные сведения о самой себе. Например — это может быть имя задачи, которое требуется для логирования.

Мы можем исследовать то, как получить экземпляр Task для главной корутины, используемой для запуска asyncio-программы.

В нижеприведённом примере определяется корутина, которая используется в качестве точки входа в программу. Она выводит сообщение, затем получает текущую задачу и сообщает сведения о ней.

Это — важный пример, который мы рассматриваем первым, так как он позволяет увидеть то, что в цикле событий ко всем корутинам можно обращаться как к задачам.

Вот полный текст этого примера:

# SuperFastPython.com
пример получения текущей задачи из главной корутины
import asyncio
определение главной корутины
async def main():
    # вывод сообщения
    print('main coroutine started')
    # получение текущей задачи
    task = asyncio.current_task()
    # вывод сведений о ней
    print(task)
запуск asyncio-программы
asyncio.run(main())

После запуска этого примера сначала создаётся главная корутина, которая используется для запуска asyncio-программы.

Корутина main() запускается и первым делом выводит сообщение.

Затем она получает текущую задачу — объект Task, представляющий саму эту корутину, которая выполняется в данный момент.

Потом программа выводит сведения о текущей выполняемой задаче.

Можно видеть, что первой задаче по умолчанию назначается имя Task-1, и то, что выполняется корутина main() — то есть корутина, которая работает в текущий момент.

Это указывает на то, что мы можем использовать функцию asyncio.current_task() для доступа к объекту Task корутины, выполняющейся в текущий момент, которая автоматически обёрнута в объект Task.

main coroutine started
<Task pending name='Task-1' coro=<main() running at ...> cb=[_run_until_complete_cb() at ...]>

Тема текущих выполняемых задач подробнее раскрыта здесь.

9.2. Как получить объекты всех задач

Может понадобиться получить доступ ко всем задачам в asyncio-программе.

На это может быть множество причин:

  • Для того чтобы проанализировать текущее состояние или сложность программы.

  • Для того чтобы залогировать сведения обо всех выполняющихся задачах.

  • Для того чтобы найти задачу, к которой можно обратиться, или которую можно отменить.

Можно получить набор объектов всех запланированных и выполняющихся (ещё не завершённых) задач asyncio-программы с помощью функции asyncio.all_tasks():

...
получение всех задач
tasks = asyncio.all_tasks()

После вызова этой функции в нашем распоряжении будет набор всех задач asyncio-программы.

Он оформлен в виде множества, поэтому каждая задача представлена в нём в единственном экземпляре.

Задача попадёт в это множество в следующих случаях:

  • Задача была запланирована, но ещё не выполняется.

  • Задача выполняется в настоящий момент (например — выполняется, но приостановлена).

В возвращённый набор задач будет включён и объект текущей выполняемой задачи — то есть той задачи, которая выполняет корутину, вызвавшую функцию asyncio.all_tasks().

Кроме того — вспомните о том, что метод asyncio.run(), используемый для запуска asyncio-программы, обернёт предоставленную ему корутину в задачу. Это значит, что набор всех задач будет включать в себя задачу, представляющую собой точку входа в программу.

Можно исследовать ситуацию, когда есть asyncio-программа, в которой имеется несколько задач. Нужно получить множество, содержащее объекты этих задач.

В данном примере мы сначала создаём 10 задач, каждая из которых оборачивает и выполняет одну и ту же корутину.

Потом главная корутина запрашивает получение множества, содержащее все эти задачи — запланированные или выполняющиеся — и выводит сведения о них.

Вот полный код примера:

# SuperFastPython.com
пример, где запускают множество задач, а после этого получают к ним доступ
import asyncio
корутина для задач
async def task_coroutine(value):
    # вывод сообщения
    print(f'task {value} is running')
    # краткая блокировка
    await asyncio.sleep(1)
определение главной корутины
async def main():
    # вывод сообщения
    print('main coroutine started')
    # запуск нескольких задач
    started_tasks = [asyncio.create_task(task_coroutine(i)) for i in range(10)]
    # выделение времени, необходимого на то, чтобы некоторые из задач запустились
    await asyncio.sleep(0.1)
    # получение всех задач
    tasks = asyncio.all_tasks()
    # вывод сведений обо всех задачах
    for task in tasks:
        print(f'> {task.get_name()}, {task.get_coro()}')
    # ждём завершения всех задач
    for task in started_tasks:
        await task
запуск asyncio-программы
asyncio.run(main())

При запуске этого примера сначала создаётся главная корутина, которая используется для запуска asyncio-программы.

Корутина main() начинает работу и, в первую очередь, выводит сообщения.

Затем она создаёт 10 задач, оборачивающих специально подготовленную для них корутину, и планирует выполнение этих задач.

После этого корутина main() блокируется на некоторое время, что позволяет задачам начать выполняться.

Задачи начинают выполняться, каждая из них выводит сообщение, после чего «засыпает».

Работа корутины main() возобновляется, она получает список всех задач в программе.

Далее — она выводит для каждой задачи её имя и сведения о связанной с ней корутине.

И наконец — она перебирает список созданных задач и ожидает завершения работы каждой из них, позволяя им нормально отработать.

Этот пример демонстрирует возможность получения множества всех задач asyncio-программы, в которое входят и созданные задачи, и задача, которая представляет собой точку входа в программу.

main coroutine started
task 0 is running
task 1 is running
task 2 is running
task 3 is running
task 4 is running
task 5 is running
task 6 is running
task 7 is running
task 8 is running
task 9 is running
> Task-9, <coroutine object task_coroutine at 0x10e186e30>
> Task-2, <coroutine object task_coroutine at 0x10e184e40>
> Task-11, <coroutine object task_coroutine at 0x10e186f10>
> Task-7, <coroutine object task_coroutine at 0x10e186d50>
> Task-4, <coroutine object task_coroutine at 0x10e185700>
> Task-10, <coroutine object task_coroutine at 0x10e186ea0>
> Task-8, <coroutine object task_coroutine at 0x10e186dc0>
> Task-5, <coroutine object task_coroutine at 0x10e186ab0>
> Task-1, <coroutine object main at 0x10e1847b0>
> Task-3, <coroutine object task_coroutine at 0x10e184f90>

Подробности о работе с наборами задач можно узнать здесь.

Наша следующая тема — конкурентное выполнение нескольких корутин.

О, а приходите к нам работать? ? ?

Мы в wunderfund.io занимаемся высокочастотной алготорговлей с 2014 года. Высокочастотная торговля — это непрерывное соревнование лучших программистов и математиков всего мира. Присоединившись к нам, вы станете частью этой увлекательной схватки.

Мы предлагаем интересные и сложные задачи по анализу данных и low latency разработке для увлеченных исследователей и программистов. Гибкий график и никакой бюрократии, решения быстро принимаются и воплощаются в жизнь.

Сейчас мы ищем плюсовиков, питонистов, дата-инженеров и мл-рисерчеров.

Присоединяйтесь к нашей команде.

Теги:
Хабы:
Всего голосов 20: ↑20 и ↓0+20
Комментарии0

Публикации

Информация

Сайт
wunderfund.io
Дата регистрации
Дата основания
Численность
11–30 человек
Местоположение
Россия
Представитель
xopxe

Истории