Я помню тот момент, когда подумал «Как же медленно всё работает, что если я распараллелю вызовы?», а спустя 3 дня, взглянув на код, ничего не мог понять в жуткой каше из потоков, синхронизаторов и функций обратного вызова.
Тогда я познакомился с asyncio, и всё изменилось.
Если кто не знает, asyncio — новый модуль для организации конкурентного программирования, который появился в Python 3.4. Он предназначен для упрощения использования корутин и футур в асинхронном коде — чтобы код выглядел как синхронный, без коллбэков.
Я помню, в то время было несколько похожих инструментов, и один из них выделялся — это библиотека gevent. Я советую всем прочитать прекрасное руководство gevent для практикующего python-разработчика, в котором описана не только работа с ней, но и что такое конкурентность в общем понимании. Мне настолько понравилось та статья, что я решил использовать её как шаблон для написания введения в asyncio.
Небольшой дисклеймер — это статья не gevent vs asyncio. Nathan Road уже сделал это за меня в своей заметке. Все примеры вы можете найти на GitHub.
Я знаю, вам уже не терпится писать код, но для начала я бы хотел рассмотреть несколько концепций, которые нам пригодятся в дальнейшем.
Потоки — наиболее распространённый инструмент. Думаю, вы слышали о нём и ранее, однако asyncio оперирует несколько другими понятиями: циклы событий, корутины и футуры.
Довольно просто? Поехали!
В видео "Конкурентность — это не параллелизм, это лучше" Роб Пайк обращает ваше внимание на ключевую вещь. Разбиение задач на конкурентные подзадачи возможно только при таком параллелизме, когда он же и управляет этими подзадачами.
Asyncio делает тоже самое — вы можете разбивать ваш код на процедуры, которые определять как корутины, что даёт возможность управлять ими как пожелаете, включая и одновременное выполнение. Корутины содержат операторы yield, с помощью которых мы определяем места, где можно переключиться на другие ожидающие выполнения задачи.
За переключение контекста в asyncio отвечает yield, который передаёт управление обратно в event loop, а тот в свою очередь — к другой корутине. Рассмотрим базовый пример:
* Сначала мы объявили пару простейших корутин, которые притворяются неблокирующими, используя sleep из asyncio
* Корутины могут быть запущены только из другой корутины, или обёрнуты в задачу с помощью create_task
* После того, как у нас оказались 2 задачи, объединим их, используя wait
* И, наконец, отправим на выполнение в цикл событий через run_until_complete
Используя await в какой-либо корутине, мы таким образом объявляем, что корутина может отдавать управление обратно в event loop, который, в свою очередь, запустит какую-либо следующую задачу: bar. В bar произойдёт тоже самое: на await asyncio.sleep управление будет передано обратно в цикл событий, который в нужное время вернётся к выполнению foo.
Представим 2 блокирующие задачи: gr1 и gr2, как будто они обращаются к неким сторонним сервисам, и, пока они ждут ответа, третья функция может работать асинхронно.
Обратите внимание как происходит работа с вводом-выводом и планированием выполнения, позволяя всё это уместить в один поток. Пока две задачи заблокированы ожиданием I/O, третья функция может занимать всё процессорное время.
В синхронном мире мы мыслим последовательно. Если у нас есть список задач, выполнение которых занимает разное время, то они завершатся в том же порядке, в котором поступили в обработку. Однако, в случае конкурентности нельзя быть в этом уверенным.
Разумеется, ваш результат будет иным, поскольку каждая задача будет засыпать на случайное время, но заметьте, что результат выполнения полностью отличается, хотя мы всегда ставим задачи в одном и том же порядке.
Также обратите внимание на корутину для нашей довольно простой задачи. Это важно для понимания, что в asyncio нет никакой магии при реализации неблокирующих задач. Во время реализации asyncio стоял отдельно в стандартной библиотеке, т.к. остальные модули предоставляли только блокирующую функциональность. Вы можете использовать модуль concurrent.futures для оборачивания блокирующих задач в потоки или процессы и получения футуры для использования в asyncio. Несколько таких примеров доступны на GitHub.
Это, наверно, главный недостаток сейчас при использовании asyncio, однако уже есть несколько библиотек, помогающих решить эту проблему.
Самая популярная блокирующая задача — получение данных по HTTP-запросу. Рассмотрим работу с великолепной библиотекой aiohttp на примере получения информации о публичных событиях на GitHub.
Тут стоит обратить внимание на пару моментов.
Во-первых, разница во времени — при использовании асинхронных вызовов мы запускаем запросы одновременно. Как говорилось ранее, каждый из них передавал управление следующему и возвращал результат по завершении. То есть скорость выполнения напрямую зависит от времени работы самого медленного запроса, который занял как раз 0.54 секунды. Круто, правда?
Во-вторых, насколько код похож на синхронный. Это же по сути одно и то же! Основные отличия связаны с реализацией библиотеки для выполнения запросов, созданием и ожиданием завершения задач.
До сих пор мы использовали единственный метод создания и получения результатов из корутин, создания набора задач и ожидания их завершения. Однако, корутины могут быть запланированы для запуска и получения результатов несколькими способами. Представьте ситуацию, когда нам надо обрабатывать результаты GET-запросов по мере их получения; на самом деле реализация очень похожа на предыдущую:
Посмотрите на отступы и тайминги — мы запустили все задачи одновременно, однако они обработаны в порядке завершения выполнения. Код в данном случае немного отличается: мы пакуем корутины, каждая из которых уже подготовлена для выполнения, в список. Функция as_completed возвращает итератор, который выдаёт результаты корутин по мере их выполнения. Круто же, правда?! Кстати, и as_completed, и wait — функции из пакета concurrent.futures.
Ещё один пример — что если вы хотите узнать свой IP адрес. Есть куча сервисов для этого, но вы не знаете какой из них будет доступен в момент работы программы. Вместо того, чтобы последовательно опрашивать каждый из списка, можно запустить все запросы конкурентно и выбрать первый успешный.
Что ж, для этого в нашей любимой функции wait есть специальный параметр return_when. До сих пор мы игнорировали то, что возвращает wait, т.к. только распараллеливали задачи. Но теперь нам надо получить результат из корутины, так что будем использовать набор футур done и pending.
Что же случилось? Первый сервис ответил успешно, но в логах какое-то предупреждение!
На самом деле мы запустили выполнение двух задач, но вышли из цикла уже после первого результата, в то время как вторая корутина ещё выполнялась. Asyncio подумал что это баг и предупредил нас. Наверно, стоит прибираться за собой и явно убивать ненужные задачи. Как? Рад, что вы спросили.
Всё настолько просто. Когда футура находится в состояние done, у неё можно получить результат выполнения. В состояниях pending и running такая операция приведёт к исключению InvalidStateError, а в случае canelled будет CancelledError, и наконец, если исключение произошло в самой корутине, оно будет сгенерировано снова (также, как это сделано при вызове exception). Но не верьте мне на слово.
Вы можете узнать состояние футуры с помощью методов done, cancelled или running, но не забывайте, что в случае done вызов result может вернуть как ожидаемый результат, так и исключение, которое возникло в процессе работы. Для отмены выполнения футуры есть метод cancel. Это подходит для исправления нашего примера.
Простой и аккуратный вывод — как раз то, что я люблю!
Если вам нужна некоторая дополнительная логика по обработке футур, то вы можете подключать коллбэки, которые будут вызваны при переходе в состояние done. Это может быть полезно для тестов, когда некоторые результаты надо переопределить какими-то своими значениями.
asyncio — это целиком про написание управляемого и читаемого конкурентного кода, что хорошо заметно при обработке исключений. Вернёмся к примеру, чтобы продемонстрировать.
Допустим, мы хотим убедиться, что все запросы к сервисам по определению IP вернули одинаковый результат. Однако, один из них может быть оффлайн и не ответить нам. Просто применим try...except как обычно:
Мы также можем обработать исключение, которое возникло в процессе выполнения корутины:
Точно также, как и запуск задачи без ожидания её завершения является ошибкой, так и получение неизвестных исключений оставляет свои следы в выводе:
Вывод выглядит также, как и в предыдущем примере за исключением укоризненного сообщения от asyncio.
А что, если информация о нашем IP не так уж важна? Это может быть хорошим дополнением к какому-то составному ответу, в котором эта часть будет опциональна. В таком случае не будем заставлять пользователя ждать. В идеале мы бы ставили таймаут на вычисление IP, после которого в любом случае отдавали ответ пользователю, даже без этой информации.
И снова у wait есть подходящий аргумент:
Я также добавил аргумент timeout к строке запуска скрипта, чтобы проверить что же произойдёт, если запросы успеют обработаться. Также я добавил случайные задержки, чтобы скрипт не завершался слишком быстро, и было время разобраться как именно он работает.
Asyncio укрепил мою и так уже большую любовь к python. Если честно, я влюбился в сопрограммы, ещё когда познакомился с ними в Tornado, но asyncio сумел взять всё лучшее из него и других библиотек по реализации конкурентности. Причём настолько, что были предприняты особые усилия, чтобы они могли использовать основной цикл ввода-вывода. Так что если вы используете Tornado или Twisted, то можете подключать код, предназначенный для asyncio!
Как я уже упоминал, основная проблема заключается в том, что стандартные библиотеки пока ещё не поддерживают неблокирующее поведение. Также и многие популярные библиотеки работают пока лишь в синхронном стиле, а те, что используют конкурентность, пока ещё молоды и экспериментальны. Однако, их число растёт.
Надеюсь, в этом уроке я показал, насколько приятно работать с asyncio, и эта технология подтолкнёт вас к переходу на python 3, если вы по какой-то причине застряли на python 2.7. Одно точно — будущее Python полностью изменилось.
Тогда я познакомился с asyncio, и всё изменилось.
Если кто не знает, asyncio — новый модуль для организации конкурентного программирования, который появился в Python 3.4. Он предназначен для упрощения использования корутин и футур в асинхронном коде — чтобы код выглядел как синхронный, без коллбэков.
Я помню, в то время было несколько похожих инструментов, и один из них выделялся — это библиотека gevent. Я советую всем прочитать прекрасное руководство gevent для практикующего python-разработчика, в котором описана не только работа с ней, но и что такое конкурентность в общем понимании. Мне настолько понравилось та статья, что я решил использовать её как шаблон для написания введения в asyncio.
Небольшой дисклеймер — это статья не gevent vs asyncio. Nathan Road уже сделал это за меня в своей заметке. Все примеры вы можете найти на GitHub.
Я знаю, вам уже не терпится писать код, но для начала я бы хотел рассмотреть несколько концепций, которые нам пригодятся в дальнейшем.
Потоки, циклы событий, корутины и футуры
Потоки — наиболее распространённый инструмент. Думаю, вы слышали о нём и ранее, однако asyncio оперирует несколько другими понятиями: циклы событий, корутины и футуры.
- цикл событий (event loop) по большей части всего лишь управляет выполнением различных задач: регистрирует поступление и запускает в подходящий момент
- корутины — специальные функции, похожие на генераторы python, от которых ожидают (await), что они будут отдавать управление обратно в цикл событий. Необходимо, чтобы они были запущены именно через цикл событий
- футуры — объекты, в которых хранится текущий результат выполнения какой-либо задачи. Это может быть информация о том, что задача ещё не обработана или уже полученный результат; а может быть вообще исключение
Довольно просто? Поехали!
Синхронное и асинхронное выполнение
В видео "Конкурентность — это не параллелизм, это лучше" Роб Пайк обращает ваше внимание на ключевую вещь. Разбиение задач на конкурентные подзадачи возможно только при таком параллелизме, когда он же и управляет этими подзадачами.
Asyncio делает тоже самое — вы можете разбивать ваш код на процедуры, которые определять как корутины, что даёт возможность управлять ими как пожелаете, включая и одновременное выполнение. Корутины содержат операторы yield, с помощью которых мы определяем места, где можно переключиться на другие ожидающие выполнения задачи.
За переключение контекста в asyncio отвечает yield, который передаёт управление обратно в event loop, а тот в свою очередь — к другой корутине. Рассмотрим базовый пример:
import asyncio
async def foo():
print('Running in foo')
await asyncio.sleep(0)
print('Explicit context switch to foo again')
async def bar():
print('Explicit context to bar')
await asyncio.sleep(0)
print('Implicit context switch back to bar')
ioloop = asyncio.get_event_loop()
tasks = [ioloop.create_task(foo()), ioloop.create_task(bar())]
wait_tasks = asyncio.wait(tasks)
ioloop.run_until_complete(wait_tasks)
ioloop.close()
$ python3 1-sync-async-execution-asyncio-await.py
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
* Сначала мы объявили пару простейших корутин, которые притворяются неблокирующими, используя sleep из asyncio
* Корутины могут быть запущены только из другой корутины, или обёрнуты в задачу с помощью create_task
* После того, как у нас оказались 2 задачи, объединим их, используя wait
* И, наконец, отправим на выполнение в цикл событий через run_until_complete
Используя await в какой-либо корутине, мы таким образом объявляем, что корутина может отдавать управление обратно в event loop, который, в свою очередь, запустит какую-либо следующую задачу: bar. В bar произойдёт тоже самое: на await asyncio.sleep управление будет передано обратно в цикл событий, который в нужное время вернётся к выполнению foo.
Представим 2 блокирующие задачи: gr1 и gr2, как будто они обращаются к неким сторонним сервисам, и, пока они ждут ответа, третья функция может работать асинхронно.
import time
import asyncio
start = time.time()
def tic():
return 'at %1.1f seconds' % (time.time() - start)
async def gr1():
# Busy waits for a second, but we don't want to stick around...
print('gr1 started work: {}'.format(tic()))
await asyncio.sleep(2)
print('gr1 ended work: {}'.format(tic()))
async def gr2():
# Busy waits for a second, but we don't want to stick around...
print('gr2 started work: {}'.format(tic()))
await asyncio.sleep(2)
print('gr2 Ended work: {}'.format(tic()))
async def gr3():
print("Let's do some stuff while the coroutines are blocked, {}".format(tic()))
await asyncio.sleep(1)
print("Done!")
ioloop = asyncio.get_event_loop()
tasks = [
ioloop.create_task(gr1()),
ioloop.create_task(gr2()),
ioloop.create_task(gr3())
]
ioloop.run_until_complete(asyncio.wait(tasks))
ioloop.close()
$ python3 1b-cooperatively-scheduled-asyncio-await.py
gr1 started work: at 0.0 seconds
gr2 started work: at 0.0 seconds
Lets do some stuff while the coroutines are blocked, at 0.0 seconds
Done!
gr1 ended work: at 2.0 seconds
gr2 Ended work: at 2.0 seconds
Обратите внимание как происходит работа с вводом-выводом и планированием выполнения, позволяя всё это уместить в один поток. Пока две задачи заблокированы ожиданием I/O, третья функция может занимать всё процессорное время.
Порядок выполнения
В синхронном мире мы мыслим последовательно. Если у нас есть список задач, выполнение которых занимает разное время, то они завершатся в том же порядке, в котором поступили в обработку. Однако, в случае конкурентности нельзя быть в этом уверенным.
import random
from time import sleep
import asyncio
def task(pid):
"""Synchronous non-deterministic task.
"""
sleep(random.randint(0, 2) * 0.001)
print('Task %s done' % pid)
async def task_coro(pid):
"""Coroutine non-deterministic task
"""
await asyncio.sleep(random.randint(0, 2) * 0.001)
print('Task %s done' % pid)
def synchronous():
for i in range(1, 10):
task(i)
async def asynchronous():
tasks = [asyncio.ensure_future(task_coro(i)) for i in range(1, 10)]
await asyncio.wait(tasks)
print('Synchronous:')
synchronous()
ioloop = asyncio.get_event_loop()
print('Asynchronous:')
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 1c-determinism-sync-async-asyncio-await.py
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:
Task 2 done
Task 5 done
Task 6 done
Task 8 done
Task 9 done
Task 1 done
Task 4 done
Task 3 done
Task 7 done
Разумеется, ваш результат будет иным, поскольку каждая задача будет засыпать на случайное время, но заметьте, что результат выполнения полностью отличается, хотя мы всегда ставим задачи в одном и том же порядке.
Также обратите внимание на корутину для нашей довольно простой задачи. Это важно для понимания, что в asyncio нет никакой магии при реализации неблокирующих задач. Во время реализации asyncio стоял отдельно в стандартной библиотеке, т.к. остальные модули предоставляли только блокирующую функциональность. Вы можете использовать модуль concurrent.futures для оборачивания блокирующих задач в потоки или процессы и получения футуры для использования в asyncio. Несколько таких примеров доступны на GitHub.
Это, наверно, главный недостаток сейчас при использовании asyncio, однако уже есть несколько библиотек, помогающих решить эту проблему.
Самая популярная блокирующая задача — получение данных по HTTP-запросу. Рассмотрим работу с великолепной библиотекой aiohttp на примере получения информации о публичных событиях на GitHub.
import time
import urllib.request
import asyncio
import aiohttp
URL = 'https://api.github.com/events'
MAX_CLIENTS = 3
def fetch_sync(pid):
print('Fetch sync process {} started'.format(pid))
start = time.time()
response = urllib.request.urlopen(URL)
datetime = response.getheader('Date')
print('Process {}: {}, took: {:.2f} seconds'.format(
pid, datetime, time.time() - start))
return datetime
async def fetch_async(pid):
print('Fetch async process {} started'.format(pid))
start = time.time()
response = await aiohttp.request('GET', URL)
datetime = response.headers.get('Date')
print('Process {}: {}, took: {:.2f} seconds'.format(
pid, datetime, time.time() - start))
response.close()
return datetime
def synchronous():
start = time.time()
for i in range(1, MAX_CLIENTS + 1):
fetch_sync(i)
print("Process took: {:.2f} seconds".format(time.time() - start))
async def asynchronous():
start = time.time()
tasks = [asyncio.ensure_future(
fetch_async(i)) for i in range(1, MAX_CLIENTS + 1)]
await asyncio.wait(tasks)
print("Process took: {:.2f} seconds".format(time.time() - start))
print('Synchronous:')
synchronous()
print('Asynchronous:')
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 1d-async-fetch-from-server-asyncio-await.py
Synchronous:
Fetch sync process 1 started
Process 1: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.54 seconds
Fetch sync process 2 started
Process 2: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.50 seconds
Fetch sync process 3 started
Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.48 seconds
Process took: 1.54 seconds
Asynchronous:
Fetch async process 1 started
Fetch async process 2 started
Fetch async process 3 started
Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.50 seconds
Process 2: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.52 seconds
Process 1: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.54 seconds
Process took: 0.54 seconds
Тут стоит обратить внимание на пару моментов.
Во-первых, разница во времени — при использовании асинхронных вызовов мы запускаем запросы одновременно. Как говорилось ранее, каждый из них передавал управление следующему и возвращал результат по завершении. То есть скорость выполнения напрямую зависит от времени работы самого медленного запроса, который занял как раз 0.54 секунды. Круто, правда?
Во-вторых, насколько код похож на синхронный. Это же по сути одно и то же! Основные отличия связаны с реализацией библиотеки для выполнения запросов, созданием и ожиданием завершения задач.
Создание конкурентности
До сих пор мы использовали единственный метод создания и получения результатов из корутин, создания набора задач и ожидания их завершения. Однако, корутины могут быть запланированы для запуска и получения результатов несколькими способами. Представьте ситуацию, когда нам надо обрабатывать результаты GET-запросов по мере их получения; на самом деле реализация очень похожа на предыдущую:
import time
import random
import asyncio
import aiohttp
URL = 'https://api.github.com/events'
MAX_CLIENTS = 3
async def fetch_async(pid):
start = time.time()
sleepy_time = random.randint(2, 5)
print('Fetch async process {} started, sleeping for {} seconds'.format(
pid, sleepy_time))
await asyncio.sleep(sleepy_time)
response = await aiohttp.request('GET', URL)
datetime = response.headers.get('Date')
response.close()
return 'Process {}: {}, took: {:.2f} seconds'.format(
pid, datetime, time.time() - start)
async def asynchronous():
start = time.time()
futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)]
for i, future in enumerate(asyncio.as_completed(futures)):
result = await future
print('{} {}'.format(">>" * (i + 1), result))
print("Process took: {:.2f} seconds".format(time.time() - start))
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 2a-async-fetch-from-server-as-completed-asyncio-await.py
Fetch async process 1 started, sleeping for 4 seconds
Fetch async process 3 started, sleeping for 5 seconds
Fetch async process 2 started, sleeping for 3 seconds
>> Process 2: Wed, 17 Feb 2016 13:55:19 GMT, took: 3.53 seconds
>>>> Process 1: Wed, 17 Feb 2016 13:55:20 GMT, took: 4.49 seconds
>>>>>> Process 3: Wed, 17 Feb 2016 13:55:21 GMT, took: 5.48 seconds
Process took: 5.48 seconds
Посмотрите на отступы и тайминги — мы запустили все задачи одновременно, однако они обработаны в порядке завершения выполнения. Код в данном случае немного отличается: мы пакуем корутины, каждая из которых уже подготовлена для выполнения, в список. Функция as_completed возвращает итератор, который выдаёт результаты корутин по мере их выполнения. Круто же, правда?! Кстати, и as_completed, и wait — функции из пакета concurrent.futures.
Ещё один пример — что если вы хотите узнать свой IP адрес. Есть куча сервисов для этого, но вы не знаете какой из них будет доступен в момент работы программы. Вместо того, чтобы последовательно опрашивать каждый из списка, можно запустить все запросы конкурентно и выбрать первый успешный.
Что ж, для этого в нашей любимой функции wait есть специальный параметр return_when. До сих пор мы игнорировали то, что возвращает wait, т.к. только распараллеливали задачи. Но теперь нам надо получить результат из корутины, так что будем использовать набор футур done и pending.
from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp
Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
SERVICES = (
Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
Service('ip-api', 'http://ip-api.com/json', 'query')
)
async def fetch_ip(service):
start = time.time()
print('Fetching IP from {}'.format(service.name))
response = await aiohttp.request('GET', service.url)
json_response = await response.json()
ip = json_response[service.ip_attr]
response.close()
return '{} finished with result: {}, took: {:.2f} seconds'.format(
service.name, ip, time.time() - start)
async def asynchronous():
futures = [fetch_ip(service) for service in SERVICES]
done, pending = await asyncio.wait(
futures, return_when=FIRST_COMPLETED)
print(done.pop().result())
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 2c-fetch-first-ip-address-response-await.py
Fetching IP from ip-api
Fetching IP from ipify
ip-api finished with result: 82.34.76.170, took: 0.09 seconds
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x10f95c6d8>
Task was destroyed but it is pending!
task: <Task pending coro=<fetch_ip() running at 2c-fetch-first-ip-address-response.py:20> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(10)(), Task._wakeup()]>>
Что же случилось? Первый сервис ответил успешно, но в логах какое-то предупреждение!
На самом деле мы запустили выполнение двух задач, но вышли из цикла уже после первого результата, в то время как вторая корутина ещё выполнялась. Asyncio подумал что это баг и предупредил нас. Наверно, стоит прибираться за собой и явно убивать ненужные задачи. Как? Рад, что вы спросили.
Состояния футур
- ожидание (pending)
- выполнение (running)
- выполнено (done)
- отменено (cancelled)
Всё настолько просто. Когда футура находится в состояние done, у неё можно получить результат выполнения. В состояниях pending и running такая операция приведёт к исключению InvalidStateError, а в случае canelled будет CancelledError, и наконец, если исключение произошло в самой корутине, оно будет сгенерировано снова (также, как это сделано при вызове exception). Но не верьте мне на слово.
Вы можете узнать состояние футуры с помощью методов done, cancelled или running, но не забывайте, что в случае done вызов result может вернуть как ожидаемый результат, так и исключение, которое возникло в процессе работы. Для отмены выполнения футуры есть метод cancel. Это подходит для исправления нашего примера.
from collections import namedtuple
import time
import asyncio
from concurrent.futures import FIRST_COMPLETED
import aiohttp
Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
SERVICES = (
Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
Service('ip-api', 'http://ip-api.com/json', 'query')
)
async def fetch_ip(service):
start = time.time()
print('Fetching IP from {}'.format(service.name))
response = await aiohttp.request('GET', service.url)
json_response = await response.json()
ip = json_response[service.ip_attr]
response.close()
return '{} finished with result: {}, took: {:.2f} seconds'.format(
service.name, ip, time.time() - start)
async def asynchronous():
futures = [fetch_ip(service) for service in SERVICES]
done, pending = await asyncio.wait(
futures, return_when=FIRST_COMPLETED)
print(done.pop().result())
for future in pending:
future.cancel()
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 2c-fetch-first-ip-address-response-no-warning-await.py
Fetching IP from ipify
Fetching IP from ip-api
ip-api finished with result: 82.34.76.170, took: 0.08 seconds
Простой и аккуратный вывод — как раз то, что я люблю!
Если вам нужна некоторая дополнительная логика по обработке футур, то вы можете подключать коллбэки, которые будут вызваны при переходе в состояние done. Это может быть полезно для тестов, когда некоторые результаты надо переопределить какими-то своими значениями.
Обработка исключений
asyncio — это целиком про написание управляемого и читаемого конкурентного кода, что хорошо заметно при обработке исключений. Вернёмся к примеру, чтобы продемонстрировать.
Допустим, мы хотим убедиться, что все запросы к сервисам по определению IP вернули одинаковый результат. Однако, один из них может быть оффлайн и не ответить нам. Просто применим try...except как обычно:
from collections import namedtuple
import time
import asyncio
import aiohttp
Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
SERVICES = (
Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
Service('ip-api', 'http://ip-api.com/json', 'query'),
Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
)
async def fetch_ip(service):
start = time.time()
print('Fetching IP from {}'.format(service.name))
try:
response = await aiohttp.request('GET', service.url)
except:
return '{} is unresponsive'.format(service.name)
json_response = await response.json()
ip = json_response[service.ip_attr]
response.close()
return '{} finished with result: {}, took: {:.2f} seconds'.format(
service.name, ip, time.time() - start)
async def asynchronous():
futures = [fetch_ip(service) for service in SERVICES]
done, _ = await asyncio.wait(futures)
for future in done:
print(future.result())
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 3a-fetch-ip-addresses-fail-await.py
Fetching IP from ip-api
Fetching IP from borken
Fetching IP from ipify
ip-api finished with result: 85.133.69.250, took: 0.75 seconds
ipify finished with result: 85.133.69.250, took: 1.37 seconds
borken is unresponsive
Мы также можем обработать исключение, которое возникло в процессе выполнения корутины:
from collections import namedtuple
import time
import asyncio
import aiohttp
import traceback
Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
SERVICES = (
Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'),
Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
)
async def fetch_ip(service):
start = time.time()
print('Fetching IP from {}'.format(service.name))
try:
response = await aiohttp.request('GET', service.url)
except:
return '{} is unresponsive'.format(service.name)
json_response = await response.json()
ip = json_response[service.ip_attr]
response.close()
return '{} finished with result: {}, took: {:.2f} seconds'.format(
service.name, ip, time.time() - start)
async def asynchronous():
futures = [fetch_ip(service) for service in SERVICES]
done, _ = await asyncio.wait(futures)
for future in done:
try:
print(future.result())
except:
print("Unexpected error: {}".format(traceback.format_exc()))
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 3b-fetch-ip-addresses-future-exceptions-await.py
Fetching IP from ipify
Fetching IP from borken
Fetching IP from ip-api
ipify finished with result: 85.133.69.250, took: 0.91 seconds
borken is unresponsive
Unexpected error: Traceback (most recent call last):
File “3b-fetch-ip-addresses-future-exceptions.py”, line 39, in asynchronous
print(future.result())
File “3b-fetch-ip-addresses-future-exceptions.py”, line 26, in fetch_ip
ip = json_response[service.ip_attr]
KeyError: ‘this-is-not-an-attr’
Точно также, как и запуск задачи без ожидания её завершения является ошибкой, так и получение неизвестных исключений оставляет свои следы в выводе:
from collections import namedtuple
import time
import asyncio
import aiohttp
Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
SERVICES = (
Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'),
Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
)
async def fetch_ip(service):
start = time.time()
print('Fetching IP from {}'.format(service.name))
try:
response = await aiohttp.request('GET', service.url)
except:
print('{} is unresponsive'.format(service.name))
else:
json_response = await response.json()
ip = json_response[service.ip_attr]
response.close()
print('{} finished with result: {}, took: {:.2f} seconds'.format(
service.name, ip, time.time() - start))
async def asynchronous():
futures = [fetch_ip(service) for service in SERVICES]
await asyncio.wait(futures) # intentionally ignore results
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous())
ioloop.close()
$ python3 3c-fetch-ip-addresses-ignore-exceptions-await.py
Fetching IP from ipify
Fetching IP from borken
Fetching IP from ip-api
borken is unresponsive
ipify finished with result: 85.133.69.250, took: 0.78 seconds
Task exception was never retrieved
future: <Task finished coro=<fetch_ip() done, defined at 3c-fetch-ip-addresses-ignore-exceptions.py:15> exception=KeyError(‘this-is-not-an-attr’,)>
Traceback (most recent call last):
File “3c-fetch-ip-addresses-ignore-exceptions.py”, line 25, in fetch_ip
ip = json_response[service.ip_attr]
KeyError: ‘this-is-not-an-attr’
Вывод выглядит также, как и в предыдущем примере за исключением укоризненного сообщения от asyncio.
Таймауты
А что, если информация о нашем IP не так уж важна? Это может быть хорошим дополнением к какому-то составному ответу, в котором эта часть будет опциональна. В таком случае не будем заставлять пользователя ждать. В идеале мы бы ставили таймаут на вычисление IP, после которого в любом случае отдавали ответ пользователю, даже без этой информации.
И снова у wait есть подходящий аргумент:
import time
import random
import asyncio
import aiohttp
import argparse
from collections import namedtuple
from concurrent.futures import FIRST_COMPLETED
Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
SERVICES = (
Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
Service('ip-api', 'http://ip-api.com/json', 'query'),
)
DEFAULT_TIMEOUT = 0.01
async def fetch_ip(service):
start = time.time()
print('Fetching IP from {}'.format(service.name))
await asyncio.sleep(random.randint(1, 3) * 0.1)
try:
response = await aiohttp.request('GET', service.url)
except:
return '{} is unresponsive'.format(service.name)
json_response = await response.json()
ip = json_response[service.ip_attr]
response.close()
print('{} finished with result: {}, took: {:.2f} seconds'.format(
service.name, ip, time.time() - start))
return ip
async def asynchronous(timeout):
response = {
"message": "Result from asynchronous.",
"ip": "not available"
}
futures = [fetch_ip(service) for service in SERVICES]
done, pending = await asyncio.wait(
futures, timeout=timeout, return_when=FIRST_COMPLETED)
for future in pending:
future.cancel()
for future in done:
response["ip"] = future.result()
print(response)
parser = argparse.ArgumentParser()
parser.add_argument(
'-t', '--timeout',
help='Timeout to use, defaults to {}'.format(DEFAULT_TIMEOUT),
default=DEFAULT_TIMEOUT, type=float)
args = parser.parse_args()
print("Using a {} timeout".format(args.timeout))
ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(asynchronous(args.timeout))
ioloop.close()
Я также добавил аргумент timeout к строке запуска скрипта, чтобы проверить что же произойдёт, если запросы успеют обработаться. Также я добавил случайные задержки, чтобы скрипт не завершался слишком быстро, и было время разобраться как именно он работает.
$ python 4a-timeout-with-wait-kwarg-await.py
Using a 0.01 timeout
Fetching IP from ipify
Fetching IP from ip-api
{‘message’: ‘Result from asynchronous.’, ‘ip’: ‘not available’}
$ python 4a-timeout-with-wait-kwarg-await.py -t 5
Using a 5.0 timeout
Fetching IP from ip-api
Fetching IP from ipify
ipify finished with result: 82.34.76.170, took: 1.24 seconds
{'ip': '82.34.76.170', 'message': 'Result from asynchronous.'}
Заключение
Asyncio укрепил мою и так уже большую любовь к python. Если честно, я влюбился в сопрограммы, ещё когда познакомился с ними в Tornado, но asyncio сумел взять всё лучшее из него и других библиотек по реализации конкурентности. Причём настолько, что были предприняты особые усилия, чтобы они могли использовать основной цикл ввода-вывода. Так что если вы используете Tornado или Twisted, то можете подключать код, предназначенный для asyncio!
Как я уже упоминал, основная проблема заключается в том, что стандартные библиотеки пока ещё не поддерживают неблокирующее поведение. Также и многие популярные библиотеки работают пока лишь в синхронном стиле, а те, что используют конкурентность, пока ещё молоды и экспериментальны. Однако, их число растёт.
Надеюсь, в этом уроке я показал, насколько приятно работать с asyncio, и эта технология подтолкнёт вас к переходу на python 3, если вы по какой-то причине застряли на python 2.7. Одно точно — будущее Python полностью изменилось.
От переводчика:
Оригинальная статья была опубликована 20 февраля 2016, за это время многое произошло. Вышел Python 3.6, в котором помимо оптимизаций была улучшена работа asyncio, API переведено в стабильное состояние. Были выпущены библиотеки для работы с Postgres, Redis, Elasticsearch и пр. в неблокирующем режиме. Даже новый фреймворк — Sanic, который напоминает Flask, но работает в асинхронном режиме. В конце концов даже event loop был оптимизирован и переписан на Cython, что получилось раза в 2 быстрее. Так что я не вижу причин игнорировать эту технологию!
Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.
Используете ли вы asyncio?
30.75% да, и мне нравится234
7.49% да, но получаю кучу проблем57
55.19% нет, но хотелось бы420
1.71% нет, даже и не думаю13
4.86% я на python 2.737
Проголосовал 761 пользователь. Воздержались 135 пользователей.