Я помню тот момент, когда подумал «Как же медленно всё работает, что если я распараллелю вызовы?», а спустя 3 дня, взглянув на код, ничего не мог понять в жуткой каше из потоков, синхронизаторов и функций обратного вызова.
Тогда я познакомился с asyncio, и всё изменилось.
Если кто не знает, asyncio — новый модуль для организации конкурентного программирования, который появился в Python 3.4. Он предназначен для упрощения использования корутин и футур в асинхронном коде — чтобы код выглядел как синхронный, без коллбэков.
Я помню, в то время было несколько похожих инструментов, и один из них выделялся — это библиотека gevent. Я советую всем прочитать прекрасное руководство gevent для практикующего python-разработчика, в котором описана не только работа с ней, но и что такое конкурентность в общем понимании. Мне настолько понравилось та статья, что я решил использовать её как шаблон для написания введения в asyncio.
Небольшой дисклеймер — это статья не gevent vs asyncio. Nathan Road уже сделал это за меня в своей заметке. Все примеры вы можете найти на GitHub.
Я знаю, вам уже не терпится писать код, но для начала я бы хотел рассмотреть несколько концепций, которые нам пригодятся в дальнейшем.
Потоки — наиболее распространённый инструмент. Думаю, вы слышали о нём и ранее, однако asyncio оперирует несколько другими понятиями: циклы событий, корутины и футуры.
Довольно просто? Поехали!
В видео "Конкурентность — это не параллелизм, это лучше" Роб Пайк обращает ваше внима��ие на ключевую вещь. Разбиение задач на конкурентные подзадачи возможно только при таком параллелизме, когда он же и управляет этими подзадачами.
Asyncio делает тоже самое — вы можете разбивать ваш код на процедуры, которые определять как корутины, что даёт возможность управлять ими как пожелаете, включая и одновременное выполнение. Корутины содержат операторы yield, с помощью которых мы определяем места, где можно переключиться на другие ожидающие выполнения задачи.
За переключение контекста в asyncio отвечает yield, который передаёт управление обратно в event loop, а тот в свою очередь — к другой корутине. Рассмотрим базовый пример:
* Сначала мы объявили пару простейших корутин, которые притворяются неблокирующими, используя sleep из asyncio
* Корутины могут быть запущены только из другой корутины, или обёрнуты в задачу с помощью create_task
* После того, как у нас оказались 2 задачи, объединим их, используя wait
* И, наконец, отправим на выполнение в цикл событий через run_until_complete
Используя await в какой-либо корутине, мы таким образом объявляем, что корутина может отдавать управление обратно в event loop, который, в свою очередь, запустит какую-либо следующую задачу: bar. В bar произойдёт тоже самое: на await asyncio.sleep управление будет передано обратно в цикл событий, который в нужное время вернётся к выполнению foo.
Представим 2 блокирующие задачи: gr1 и gr2, как будто они обращаются к неким сторонним сервисам, и, пока они ждут ответа, третья функция может работать асинхронно.
Обратите внимание как происходит работа с вводом-выводом и планированием выполнения, позволяя всё это уместить в один поток. Пока две задачи заблокированы ожиданием I/O, третья функция может занимать всё процессорное время.
В синхронном мире мы мыслим последовательно. Если у нас есть список задач, выполнение которых занимает разное время, то они завершатся в том же порядке, в котором поступили в обработку. Однако, в случае конкурентности нельзя быть в этом уверенным.
Разумеется, ваш результат будет иным, поскольку каждая задача будет засыпать на случайное время, но заметьте, что результат выполнения полностью отличается, хотя мы всегда ставим задачи в одном и том же порядке.
Также обратите внимание на корутину для нашей довольно простой задачи. Это важно для понимания, что в asyncio нет никакой магии при реализации неблокирующих задач. Во время реализации asyncio стоял отдельно в стандартной библиотеке, т.к. остальные модули предоставляли только блокирующую функциональность. Вы можете использовать модуль concurrent.futures для оборачивания блокирующих задач в потоки или процессы и получения футуры для использования в asyncio. Несколько таких примеров доступны на GitHub.
Это, наверно, главный недостаток сейчас при использовании asyncio, однако уже есть несколько библиотек, помогающих решить эту проблему.
Самая популярная блокирующая задача — получение данных по HTTP-запросу. Рассмотрим работу с великолепной библиотекой aiohttp на примере получения информации о публичных событиях на GitHub.
Тут стоит обратить внимание на пару моментов.
Во-первых, разница во времени — при использовании асинхронных вызовов мы запускаем запросы одновременно. Как говорилось ранее, каждый из них передавал управление следующему и возвращал результат по завершении. То есть скорость выполнения напрямую зависит от времени работы самого медленного запроса, который занял как раз 0.54 секунды. Круто, правда?
Во-вторых, насколько код похож на синхронный. Это же по сути одно и то же! Основные отличия связаны с реализацией библиотеки для выполнения запросов, созданием и ожиданием завершения задач.
До сих пор мы использовали единственный метод создания и получения результатов из корутин, создания набора задач и ожидания их завершения. Однако, корутины могут быть запланированы для запуска и получения результатов несколькими способами. Представьте ситуацию, когда нам надо обрабатывать результаты GET-запросов по мере их получения; на самом деле реализация очень похожа на предыдущую:
Посмотрите на отступы и тайминги — мы запустили все задачи одновременно, однако они обработаны в порядке завершения выполнения. Код в данном случае немного отличается: мы пакуем корутины, каждая из которых уже подготовлена для выполнения, в список. Функция as_completed возвращает итератор, который выдаёт результаты корутин по мере их выполнения. Круто же, правда?! Кстати, и as_completed, и wait — функции из пакета concurrent.futures.
Ещё один пример — что если вы хотите узнать свой IP адрес. Есть куча сервисов для этого, но вы не знаете какой из них будет доступен в момент работы программы. Вместо того, чтобы последовательно опрашивать каждый из списка, можно запустить все запросы конкурентно и выбрать первый успешный.
Что ж, для этого в нашей любимой функции wait есть специальный параметр return_when. До сих пор мы игнорировали то, что возвращает wait, т.к. только распараллеливали задачи. Но теперь нам надо получить результат из корутины, так что будем использовать набор футур done и pending.
Что же случилось? Первый сервис ответил успешно, но в логах какое-то предупреждение!
На самом деле мы запустили выполнение двух задач, но вышли из цикла уже после первого результата, в то время как вторая корутина ещё выполнялась. Asyncio подумал что это баг и предупредил нас. Наверно, стоит прибираться за собой и явно убивать ненужные задачи. Как? Рад, что вы спросили.
Всё настолько просто. Когда футура находится в состояние done, у неё можно получить результат выполнения. В состояниях pending и running такая операция приведёт к исключению InvalidStateError, а в случае canelled будет CancelledError, и наконец, если исключение произошло в самой корутине, оно будет сгенерировано снова (также, как это сделано при вызове exception). Но не верьте мне на слово.
Вы можете узнать состояние футуры с помощью методов done, cancelled или running, но не забывайте, что в случае done вызов result может вернуть как ожидаемый результат, так и исключение, которое возникло в процессе работы. Для отмены выполнения футуры есть метод cancel. Это подходит для исправления нашего примера.
Простой и аккуратный вывод — как раз то, что я люблю!
Если вам нужна некоторая дополнительная логика по обработке футур, то вы можете подключать коллбэки, которые будут вызваны при переходе в состояние done. Это может быть полезно для тестов, когда некоторые результаты надо переопределить какими-то своими значениями.
asyncio — это целиком про написание управляемого и читаемого конкурентного кода, что хорошо заметно при обработке исключений. Вернёмся к примеру, чтобы продемонстрировать.
Допустим, мы хотим убедиться, что все запросы к сервисам по определению IP вернули одинаковый результат. Однако, один из них может быть оффлайн и не ответить нам. Просто применим try...except как обычно:
Мы также можем обработать исключение, которое возникло в процессе выполнения корутины:
Точно также, как и запуск задачи без ожидания её завершения является ошибкой, так и получение неизвестных исключений оставляет свои следы в выводе:
Вывод выглядит также, как и в предыдущем примере за исключением укоризненного сообщения от asyncio.
А что, если информация о нашем IP не так уж важна? Это может быть хорошим дополнением к какому-то составному ответу, в котором эта часть будет опциональна. В таком случае не будем заставлять пользователя ждать. В идеале мы бы ставили таймаут на вычисление IP, после которого в любом случае отдавали ответ пользователю, даже без этой информации.
И снова у wait есть подходящий аргумент:
Я также добавил аргумент timeout к строке запуска скрипта, чтобы проверить что же произойдёт, если запросы успеют обработаться. Также я добавил случайные задержки, чтобы скрипт не завершался слишком быстро, и было время разобраться как именно он работает.
Asyncio укрепил мою и так уже большую любовь к python. Если честно, я влюбился в сопрограммы, ещё когда познакомился с ними в Tornado, но asyncio сумел взять всё лучшее из него и других библиотек по реализации конкурентности. Причём настолько, что были предприняты особые усилия, чтобы они могли использовать основной цикл ввода-вывода. Так что если вы используете Tornado или Twisted, то можете подключать код, предназначенный для asyncio!
Как я уже упоминал, основная проблема заключается в том, что стандартные библиотеки пока ещё не поддерживают неблокирующее поведение. Также и многие популярные библиотеки работают пока лишь в синхронном стиле, а те, что используют конкурентность, пока ещё молоды и экспериментальны. Однако, их число растёт.
Надеюсь, в этом уроке я показал, насколько приятно работать с asyncio, и эта технология подтолкнёт вас к переходу на python 3, если вы по какой-то причине застряли на python 2.7. Одно точно — будущее Python полностью изменилось.
Тогда я познакомился с asyncio, и всё изменилось.
Если кто не знает, asyncio — новый модуль для организации конкурентного программирования, который появился в Python 3.4. Он предназначен для упрощения использования корутин и футур в асинхронном коде — чтобы код выглядел как синхронный, без коллбэков.
Я помню, в то время было несколько похожих инструментов, и один из них выделялся — это библиотека gevent. Я советую всем прочитать прекрасное руководство gevent для практикующего python-разработчика, в котором описана не только работа с ней, но и что такое конкурентность в общем понимании. Мне настолько понравилось та статья, что я решил использовать её как шаблон для написания введения в asyncio.
Небольшой дисклеймер — это статья не gevent vs asyncio. Nathan Road уже сделал это за меня в своей заметке. Все примеры вы можете найти на GitHub.
Я знаю, вам уже не терпится писать код, но для начала я бы хотел рассмотреть несколько концепций, которые нам пригодятся в дальнейшем.
Потоки, циклы событий, корутины и футуры
Потоки — наиболее распространённый инструмент. Думаю, вы слышали о нём и ранее, однако asyncio оперирует несколько другими понятиями: циклы событий, корутины и футуры.
- цикл событий (event loop) по большей части всего лишь управляет выполнением различных задач: регистрирует поступление и запускает в подходящий момент
- корутины — специальные функции, похожие на генераторы python, от которых ожидают (await), что они будут отдавать управление обратно в цикл событий. Необходимо, чтобы они были запущены именно через цикл событий
- футуры — объекты, в которых хранится текущий результат выполнения какой-либо задачи. Это может быть информация о том, что задача ещё не обработана или уже полученный результат; а может быть вообще исключение
Довольно просто? Поехали!
Синхронное и асинхронное выполнение
В видео "Конкурентность — это не параллелизм, это лучше" Роб Пайк обращает ваше внима��ие на ключевую вещь. Разбиение задач на конкурентные подзадачи возможно только при таком параллелизме, когда он же и управляет этими подзадачами.
Asyncio делает тоже самое — вы можете разбивать ваш код на процедуры, которые определять как корутины, что даёт возможность управлять ими как пожелаете, включая и одновременное выполнение. Корутины содержат операторы yield, с помощью которых мы определяем места, где можно переключиться на другие ожидающие выполнения задачи.
За переключение контекста в asyncio отвечает yield, который передаёт управление обратно в event loop, а тот в свою очередь — к другой корутине. Рассмотрим базовый пример:
import asyncio async def foo(): print('Running in foo') await asyncio.sleep(0) print('Explicit context switch to foo again') async def bar(): print('Explicit context to bar') await asyncio.sleep(0) print('Implicit context switch back to bar') ioloop = asyncio.get_event_loop() tasks = [ioloop.create_task(foo()), ioloop.create_task(bar())] wait_tasks = asyncio.wait(tasks) ioloop.run_until_complete(wait_tasks) ioloop.close()
$ python3 1-sync-async-execution-asyncio-await.py Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
* Сначала мы объявили пару простейших корутин, которые притворяются неблокирующими, используя sleep из asyncio
* Корутины могут быть запущены только из другой корутины, или обёрнуты в задачу с помощью create_task
* После того, как у нас оказались 2 задачи, объединим их, используя wait
* И, наконец, отправим на выполнение в цикл событий через run_until_complete
Используя await в какой-либо корутине, мы таким образом объявляем, что корутина может отдавать управление обратно в event loop, который, в свою очередь, запустит какую-либо следующую задачу: bar. В bar произойдёт тоже самое: на await asyncio.sleep управление будет передано обратно в цикл событий, который в нужное время вернётся к выполнению foo.
Представим 2 блокирующие задачи: gr1 и gr2, как будто они обращаются к неким сторонним сервисам, и, пока они ждут ответа, третья функция может работать асинхронно.
import time import asyncio start = time.time() def tic(): return 'at %1.1f seconds' % (time.time() - start) async def gr1(): # Busy waits for a second, but we don't want to stick around... print('gr1 started work: {}'.format(tic())) await asyncio.sleep(2) print('gr1 ended work: {}'.format(tic())) async def gr2(): # Busy waits for a second, but we don't want to stick around... print('gr2 started work: {}'.format(tic())) await asyncio.sleep(2) print('gr2 Ended work: {}'.format(tic())) async def gr3(): print("Let's do some stuff while the coroutines are blocked, {}".format(tic())) await asyncio.sleep(1) print("Done!") ioloop = asyncio.get_event_loop() tasks = [ ioloop.create_task(gr1()), ioloop.create_task(gr2()), ioloop.create_task(gr3()) ] ioloop.run_until_complete(asyncio.wait(tasks)) ioloop.close()
$ python3 1b-cooperatively-scheduled-asyncio-await.py gr1 started work: at 0.0 seconds gr2 started work: at 0.0 seconds Lets do some stuff while the coroutines are blocked, at 0.0 seconds Done! gr1 ended work: at 2.0 seconds gr2 Ended work: at 2.0 seconds
Обратите внимание как происходит работа с вводом-выводом и планированием выполнения, позволяя всё это уместить в один поток. Пока две задачи заблокированы ожиданием I/O, третья функция может занимать всё процессорное время.
Порядок выполнения
В синхронном мире мы мыслим последовательно. Если у нас есть список задач, выполнение которых занимает разное время, то они завершатся в том же порядке, в котором поступили в обработку. Однако, в случае конкурентности нельзя быть в этом уверенным.
import random from time import sleep import asyncio def task(pid): """Synchronous non-deterministic task. """ sleep(random.randint(0, 2) * 0.001) print('Task %s done' % pid) async def task_coro(pid): """Coroutine non-deterministic task """ await asyncio.sleep(random.randint(0, 2) * 0.001) print('Task %s done' % pid) def synchronous(): for i in range(1, 10): task(i) async def asynchronous(): tasks = [asyncio.ensure_future(task_coro(i)) for i in range(1, 10)] await asyncio.wait(tasks) print('Synchronous:') synchronous() ioloop = asyncio.get_event_loop() print('Asynchronous:') ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 1c-determinism-sync-async-asyncio-await.py Synchronous: Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done Asynchronous: Task 2 done Task 5 done Task 6 done Task 8 done Task 9 done Task 1 done Task 4 done Task 3 done Task 7 done
Разумеется, ваш результат будет иным, поскольку каждая задача будет засыпать на случайное время, но заметьте, что результат выполнения полностью отличается, хотя мы всегда ставим задачи в одном и том же порядке.
Также обратите внимание на корутину для нашей довольно простой задачи. Это важно для понимания, что в asyncio нет никакой магии при реализации неблокирующих задач. Во время реализации asyncio стоял отдельно в стандартной библиотеке, т.к. остальные модули предоставляли только блокирующую функциональность. Вы можете использовать модуль concurrent.futures для оборачивания блокирующих задач в потоки или процессы и получения футуры для использования в asyncio. Несколько таких примеров доступны на GitHub.
Это, наверно, главный недостаток сейчас при использовании asyncio, однако уже есть несколько библиотек, помогающих решить эту проблему.
Самая популярная блокирующая задача — получение данных по HTTP-запросу. Рассмотрим работу с великолепной библиотекой aiohttp на примере получения информации о публичных событиях на GitHub.
import time import urllib.request import asyncio import aiohttp URL = 'https://api.github.com/events' MAX_CLIENTS = 3 def fetch_sync(pid): print('Fetch sync process {} started'.format(pid)) start = time.time() response = urllib.request.urlopen(URL) datetime = response.getheader('Date') print('Process {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start)) return datetime async def fetch_async(pid): print('Fetch async process {} started'.format(pid)) start = time.time() response = await aiohttp.request('GET', URL) datetime = response.headers.get('Date') print('Process {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start)) response.close() return datetime def synchronous(): start = time.time() for i in range(1, MAX_CLIENTS + 1): fetch_sync(i) print("Process took: {:.2f} seconds".format(time.time() - start)) async def asynchronous(): start = time.time() tasks = [asyncio.ensure_future( fetch_async(i)) for i in range(1, MAX_CLIENTS + 1)] await asyncio.wait(tasks) print("Process took: {:.2f} seconds".format(time.time() - start)) print('Synchronous:') synchronous() print('Asynchronous:') ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 1d-async-fetch-from-server-asyncio-await.py Synchronous: Fetch sync process 1 started Process 1: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.54 seconds Fetch sync process 2 started Process 2: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.50 seconds Fetch sync process 3 started Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.48 seconds Process took: 1.54 seconds Asynchronous: Fetch async process 1 started Fetch async process 2 started Fetch async process 3 started Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.50 seconds Process 2: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.52 seconds Process 1: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.54 seconds Process took: 0.54 seconds
Тут стоит обратить внимание на пару моментов.
Во-первых, разница во времени — при использовании асинхронных вызовов мы запускаем запросы одновременно. Как говорилось ранее, каждый из них передавал управление следующему и возвращал результат по завершении. То есть скорость выполнения напрямую зависит от времени работы самого медленного запроса, который занял как раз 0.54 секунды. Круто, правда?
Во-вторых, насколько код похож на синхронный. Это же по сути одно и то же! Основные отличия связаны с реализацией библиотеки для выполнения запросов, созданием и ожиданием завершения задач.
Создание конкурентности
До сих пор мы использовали единственный метод создания и получения результатов из корутин, создания набора задач и ожидания их завершения. Однако, корутины могут быть запланированы для запуска и получения результатов несколькими способами. Представьте ситуацию, когда нам надо обрабатывать результаты GET-запросов по мере их получения; на самом деле реализация очень похожа на предыдущую:
import time import random import asyncio import aiohttp URL = 'https://api.github.com/events' MAX_CLIENTS = 3 async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('Fetch async process {} started, sleeping for {} seconds'.format( pid, sleepy_time)) await asyncio.sleep(sleepy_time) response = await aiohttp.request('GET', URL) datetime = response.headers.get('Date') response.close() return 'Process {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def asynchronous(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] for i, future in enumerate(asyncio.as_completed(futures)): result = await future print('{} {}'.format(">>" * (i + 1), result)) print("Process took: {:.2f} seconds".format(time.time() - start)) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 2a-async-fetch-from-server-as-completed-asyncio-await.py Fetch async process 1 started, sleeping for 4 seconds Fetch async process 3 started, sleeping for 5 seconds Fetch async process 2 started, sleeping for 3 seconds >> Process 2: Wed, 17 Feb 2016 13:55:19 GMT, took: 3.53 seconds >>>> Process 1: Wed, 17 Feb 2016 13:55:20 GMT, took: 4.49 seconds >>>>>> Process 3: Wed, 17 Feb 2016 13:55:21 GMT, took: 5.48 seconds Process took: 5.48 seconds
Посмотрите на отступы и тайминги — мы запустили все задачи одновременно, однако они обработаны в порядке завершения выполнения. Код в данном случае немного отличается: мы пакуем корутины, каждая из которых уже подготовлена для выполнения, в список. Функция as_completed возвращает итератор, который выдаёт результаты корутин по мере их выполнения. Круто же, правда?! Кстати, и as_completed, и wait — функции из пакета concurrent.futures.
Ещё один пример — что если вы хотите узнать свой IP адрес. Есть куча сервисов для этого, но вы не знаете какой из них будет доступен в момент работы программы. Вместо того, чтобы последовательно опрашивать каждый из списка, можно запустить все запросы конкурентно и выбрать первый успешный.
Что ж, для этого в нашей любимой функции wait есть специальный параметр return_when. До сих пор мы игнорировали то, что возвращает wait, т.к. только распараллеливали задачи. Но теперь нам надо получить результат из корутины, так что будем использовать набор футур done и pending.
from collections import namedtuple import time import asyncio from concurrent.futures import FIRST_COMPLETED import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) response = await aiohttp.request('GET', service.url) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED) print(done.pop().result()) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 2c-fetch-first-ip-address-response-await.py Fetching IP from ip-api Fetching IP from ipify ip-api finished with result: 82.34.76.170, took: 0.09 seconds Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x10f95c6d8> Task was destroyed but it is pending! task: <Task pending coro=<fetch_ip() running at 2c-fetch-first-ip-address-response.py:20> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(10)(), Task._wakeup()]>>
Что же случилось? Первый сервис ответил успешно, но в логах какое-то предупреждение!
На самом деле мы запустили выполнение двух задач, но вышли из цикла уже после первого результата, в то время как вторая корутина ещё выполнялась. Asyncio подумал что это баг и предупредил нас. Наверно, стоит прибираться за собой и явно убивать ненужные задачи. Как? Рад, что вы спросили.
Состояния футур
- ожидание (pending)
- выполнение (running)
- выполнено (done)
- отменено (cancelled)
Всё настолько просто. Когда футура находится в состояние done, у неё можно получить результат выполнения. В состояниях pending и running такая операция приведёт к исключению InvalidStateError, а в случае canelled будет CancelledError, и наконец, если исключение произошло в самой корутине, оно будет сгенерировано снова (также, как это сделано при вызове exception). Но не верьте мне на слово.
Вы можете узнать состояние футуры с помощью методов done, cancelled или running, но не забывайте, что в случае done вызов result может вернуть как ожидаемый результат, так и исключение, которое возникло в процессе работы. Для отмены выполнения футуры есть метод cancel. Это подходит для исправления нашего примера.
from collections import namedtuple import time import asyncio from concurrent.futures import FIRST_COMPLETED import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) response = await aiohttp.request('GET', service.url) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED) print(done.pop().result()) for future in pending: future.cancel() ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 2c-fetch-first-ip-address-response-no-warning-await.py Fetching IP from ipify Fetching IP from ip-api ip-api finished with result: 82.34.76.170, took: 0.08 seconds
Простой и аккуратный вывод — как раз то, что я люблю!
Если вам нужна некоторая дополнительная логика по обработке футур, то вы можете подключать коллбэки, которые будут вызваны при переходе в состояние done. Это может быть полезно для тестов, когда некоторые результаты надо переопределить какими-то своими значениями.
Обработка исключений
asyncio — это целиком про написание управляемого и читаемого конкурентного кода, что хорошо заметно при обработке исключений. Вернёмся к примеру, чтобы продемонстрировать.
Допустим, мы хотим убедиться, что все запросы к сервисам по определению IP вернули одинаковый результат. Однако, один из них может быть оффлайн и не ответить нам. Просто применим try...except как обычно:
from collections import namedtuple import time import asyncio import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query'), Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) try: response = await aiohttp.request('GET', service.url) except: return '{} is unresponsive'.format(service.name) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, _ = await asyncio.wait(futures) for future in done: print(future.result()) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 3a-fetch-ip-addresses-fail-await.py Fetching IP from ip-api Fetching IP from borken Fetching IP from ipify ip-api finished with result: 85.133.69.250, took: 0.75 seconds ipify finished with result: 85.133.69.250, took: 1.37 seconds borken is unresponsive
Мы также можем обработать исключение, которое возникло в процессе выполнения корутины:
from collections import namedtuple import time import asyncio import aiohttp import traceback Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'), Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) try: response = await aiohttp.request('GET', service.url) except: return '{} is unresponsive'.format(service.name) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, _ = await asyncio.wait(futures) for future in done: try: print(future.result()) except: print("Unexpected error: {}".format(traceback.format_exc())) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 3b-fetch-ip-addresses-future-exceptions-await.py Fetching IP from ipify Fetching IP from borken Fetching IP from ip-api ipify finished with result: 85.133.69.250, took: 0.91 seconds borken is unresponsive Unexpected error: Traceback (most recent call last): File “3b-fetch-ip-addresses-future-exceptions.py”, line 39, in asynchronous print(future.result()) File “3b-fetch-ip-addresses-future-exceptions.py”, line 26, in fetch_ip ip = json_response[service.ip_attr] KeyError: ‘this-is-not-an-attr’
Точно также, как и запуск задачи без ожидания её завершения является ошибкой, так и получение неизвестных исключений оставляет свои следы в выводе:
from collections import namedtuple import time import asyncio import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'), Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) try: response = await aiohttp.request('GET', service.url) except: print('{} is unresponsive'.format(service.name)) else: json_response = await response.json() ip = json_response[service.ip_attr] response.close() print('{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start)) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] await asyncio.wait(futures) # intentionally ignore results ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 3c-fetch-ip-addresses-ignore-exceptions-await.py Fetching IP from ipify Fetching IP from borken Fetching IP from ip-api borken is unresponsive ipify finished with result: 85.133.69.250, took: 0.78 seconds Task exception was never retrieved future: <Task finished coro=<fetch_ip() done, defined at 3c-fetch-ip-addresses-ignore-exceptions.py:15> exception=KeyError(‘this-is-not-an-attr’,)> Traceback (most recent call last): File “3c-fetch-ip-addresses-ignore-exceptions.py”, line 25, in fetch_ip ip = json_response[service.ip_attr] KeyError: ‘this-is-not-an-attr’
Вывод выглядит также, как и в предыдущем примере за исключением укоризненного сообщения от asyncio.
Таймауты
А что, если информация о нашем IP не так уж важна? Это может быть хорошим дополнением к какому-то составному ответу, в котором эта часть будет опциональна. В таком случае не будем заставлять пользователя ждать. В идеале мы бы ставили таймаут на вычисление IP, после которого в любом случае отдавали ответ пользователю, даже без этой информации.
И снова у wait есть подходящий аргумент:
import time import random import asyncio import aiohttp import argparse from collections import namedtuple from concurrent.futures import FIRST_COMPLETED Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query'), ) DEFAULT_TIMEOUT = 0.01 async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) await asyncio.sleep(random.randint(1, 3) * 0.1) try: response = await aiohttp.request('GET', service.url) except: return '{} is unresponsive'.format(service.name) json_response = await response.json() ip = json_response[service.ip_attr] response.close() print('{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start)) return ip async def asynchronous(timeout): response = { "message": "Result from asynchronous.", "ip": "not available" } futures = [fetch_ip(service) for service in SERVICES] done, pending = await asyncio.wait( futures, timeout=timeout, return_when=FIRST_COMPLETED) for future in pending: future.cancel() for future in done: response["ip"] = future.result() print(response) parser = argparse.ArgumentParser() parser.add_argument( '-t', '--timeout', help='Timeout to use, defaults to {}'.format(DEFAULT_TIMEOUT), default=DEFAULT_TIMEOUT, type=float) args = parser.parse_args() print("Using a {} timeout".format(args.timeout)) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous(args.timeout)) ioloop.close()
Я также добавил аргумент timeout к строке запуска скрипта, чтобы проверить что же произойдёт, если запросы успеют обработаться. Также я добавил случайные задержки, чтобы скрипт не завершался слишком быстро, и было время разобраться как именно он работает.
$ python 4a-timeout-with-wait-kwarg-await.py Using a 0.01 timeout Fetching IP from ipify Fetching IP from ip-api {‘message’: ‘Result from asynchronous.’, ‘ip’: ‘not available’}
$ python 4a-timeout-with-wait-kwarg-await.py -t 5 Using a 5.0 timeout Fetching IP from ip-api Fetching IP from ipify ipify finished with result: 82.34.76.170, took: 1.24 seconds {'ip': '82.34.76.170', 'message': 'Result from asynchronous.'}
Заключение
Asyncio укрепил мою и так уже большую любовь к python. Если честно, я влюбился в сопрограммы, ещё когда познакомился с ними в Tornado, но asyncio сумел взять всё лучшее из него и других библиотек по реализации конкурентности. Причём настолько, что были предприняты особые усилия, чтобы они могли использовать основной цикл ввода-вывода. Так что если вы используете Tornado или Twisted, то можете подключать код, предназначенный для asyncio!
Как я уже упоминал, основная проблема заключается в том, что стандартные библиотеки пока ещё не поддерживают неблокирующее поведение. Также и многие популярные библиотеки работают пока лишь в синхронном стиле, а те, что используют конкурентность, пока ещё молоды и экспериментальны. Однако, их число растёт.
Надеюсь, в этом уроке я показал, насколько приятно работать с asyncio, и эта технология подтолкнёт вас к переходу на python 3, если вы по какой-то причине застряли на python 2.7. Одно точно — будущее Python полностью изменилось.
От переводчика:
Оригинальная статья была опубликована 20 февраля 2016, за это время многое произошло. Вышел Python 3.6, в котором помимо оптимизаций была улучшена работа asyncio, API переведено в стабильное состояние. Были выпущены библиотеки для работы с Postgres, Redis, Elasticsearch и пр. в неблокирующем режиме. Даже новый фреймворк — Sanic, который напоминает Flask, но работает в асинхронном режиме. В конце концов даже event loop был оптимизирован и переписан на Cython, что получилось раза в 2 быстрее. Так что я не вижу причин игнорировать эту технологию!
Только зарегистрированные пользователи могут участвовать в опросе. Войдите, пожалуйста.
Используете ли вы asyncio?
31.31%да, и мне нравится242
7.5%да, но получаю кучу проблем58
54.59%нет, но хотелось бы422
1.68%нет, даже и не думаю13
4.92%я на python 2.738
Проголосовали 773 пользователя. Воздержались 135 пользователей.
