Перевод главы 13 Параллелизм
из книги ‘Expert Python Programming’,
Second Edition
Michał Jaworski & Tarek Ziadé, 2016
В последние годы асинхронное программирование приобрело большую популярность. Python 3.5 наконец-то получил некоторые синтаксические функции, закрепляющие концепции асинхронных решений. Но это не значит, что асинхронное программирование стало возможным только начиная с Python 3.5. Многие библиотеки и фреймворки были предоставлены намного раньше, и большинство из них имеют происхождение в старых версиях Python 2. Существует даже целая альтернативная реализация Python, называемая Stackless (см. Главу 1 «Текущее состояние Python»), которая сосредоточена на этом едином подходе программирования. Для некоторых решений, таких как Twisted, Tornado или Eventlet, до сих пор существуют активные сообщества, и их действительно стоит знать. В любом случае, начиная с Python 3.5, асинхронное программирование стало проще, чем когда-либо прежде. Таким образом, ожидается, что его встроенные асинхронные функции заменят большую часть старых инструментов, или внешние проекты постепенно превратятся в своего рода высокоуровневые фреймворки, основанные на встроенных в Python.
При попытке объяснить, что такое асинхронное программирование, проще всего думать об этом подходе как о чем-то похожем на потоки, но без системного планированщика. Это означает, что асинхронная программа может одновременно обрабатывать задачи, но ее контекст переключается внутри, а не системным планировщиком.
Но, конечно, мы не используем потоки для параллельной обработки задач в асинхронной программе. Большинство решений используют разные концепции и, в зависимости от реализации, называются по-разному. Некоторые примеры имен, используемых для описания таких параллельных программных объектов:
По понятным причинам в этом разделе мы сосредоточимся только на сопрограммах, которые изначально поддерживаются Python, начиная с версии 3.5.
Совместная многозадачность является ядром асинхронного программирования. В этом смысле многозадачность в операционной системе не обязана инициировать переключение контекста (другому процессу или потоку), а вместо этого каждый процесс добровольно освобождает управление, когда находится в режиме ожидания, чтобы обеспечить одновременное выполнение нескольких программ. Вот почему это называется совместным. Все процессы должны сотрудничать для того, чтобы многозадачность осуществлялась успешно.
Модель многозадачности иногда использовалась в операционных системах, но сейчас ее вряд ли можно найти в качестве решения системного уровня. Это связано с тем, что существует риск того, что одна плохо спроектированная служба может легко нарушить стабильность всей системы. Планирование потоков и процессов с помощью переключателей контекста, управляемых непосредственно операционной системой, в настоящее время является доминирующим подходом для параллелизма на системном уровне. Но совместная многозадачность все еще является отличным инструментом параллелизма на уровне приложений.
Говоря о совместной многозадачности на уровне приложений, мы не имеем дело с потоками или процессами, которым необходимо освободить управление, поскольку все выполнение содержится в одном процессе и потоке. Вместо этого у нас есть несколько задач (сопрограммы, tasklets и зеленые потоки), которые передают управление одной функции, управляющей координацией задач. Эта функция обычно является своего рода циклом событий.
Чтобы избежать путаницы (из-за терминологии Python), теперь мы будем называть такие параллельные задачи сопрограммами. Самая важная проблема в совместной многозадачности — когда передать контроль. В большинстве асинхронных приложений управление передается планировщику или циклу событий при операциях ввода-вывода. Независимо от того, читает ли программа данные из файловой системы или осуществляет связь через сокет, такая операция ввода-вывода всегда связана с некоторым временем ожидания, когда процесс становится бездействующим. Время ожидания зависит от внешнего ресурса, поэтому это хорошая возможность освободить управление, чтобы другие сопрограммы могли выполнять свою работу, пока им тоже не придется ждать, что такой подход несколько похожим по поведению на то, как многопоточность реализована в Python. Мы знаем, что GIL сериализует потоки Python, но он также освобождается при каждой операции ввода-вывода. Основное различие заключается в том, что потоки в Python реализованы как потоки системного уровня, поэтому операционная система может в любой момент выгрузить текущий запущенный поток и передать управление другому.
В асинхронном программировании задачи никогда не прерываются главным циклом событий. Вот почему этот стиль многозадачности также называется не приоритетной многозадачностью.
Конечно, каждое приложение Python работает в операционной системе, где есть другие процессы, конкурирующие за ресурсы. Это означает, что операционная система всегда имеет право выгрузить весь процесс и передать управление другому. Но когда наше асинхронное приложение запускается обратно, оно продолжается с того же места, где оно было приостановлено, когда системный планировщик вмешался. Именно поэтому сопрограммы в данном контексте считаются не вытесняющими.
Ключевые слова async и await являются основными строительными блоками в асинхронном программировании Python.
Ключевое слово async, используемое перед оператором def, определяет новую сопрограмму. Выполнение функции сопрограммы может быть приостановлено и возобновлено в строго определенных обстоятельствах. Его синтаксис и поведение очень похожи на генераторы (см. Главу 2 «Рекомендации по синтаксису» ниже уровня класса). Фактически, генераторы должны использоваться в более старых версиях Python для реализации сопрограмм. Вот пример объявления функции, которая использует ключевое слово async:
Функции, определенные с помощью ключевого слова async, являются специальными. При вызове они не выполняют код внутри, а вместо этого возвращают объект сопрограммы:
Объект сопрограммы (coroutine object) ничего не делает, пока его выполнение не запланировано в цикле событий. Модуль asyncio доступен для предоставления базовой реализации цикла событий, а также множества других асинхронных утилит:
Естественно, создавая только одну простую сопрограмму, в нашей программе мы не осуществляем параллелизма. Чтобы увидеть что-то действительно параллельное, нам нужно создать больше задач, которые будут выполняться циклом событий.
Новые задачи можно добавить в цикл, вызвав метод loop.create_task () или предоставив другой объект для ожидания использования функции asyncio.wait (). Мы будем использовать последний подход и попытаемся асинхронно напечатать последовательность чисел, сгенерированных с помощью функции range ():
Функция asyncio.wait () принимает список объектов сопрограмм и немедленно возвращается. Результатом является генератор, который выдает объекты, представляющие будущие результаты (фьючерсы). Как следует из названия, он используется для ожидания завершения всех предоставленных сопрограмм. Причина, по которой он возвращает генератор вместо объекта сопрограммы, заключается в обратной совместимости с предыдущими версиями Python, что будет объяснено позже. Результат выполнения этого скрипта может быть следующим:
Как мы видим, числа печатаются не в том порядке, в котором мы создали наши сопрограммы. Но это именно то, чего мы хотели достичь.
Второе важное ключевое слово, добавленное в Python 3.5, await. Он используется для ожидания результатов сопрограммы или будущего события (поясняется позже) и освобождения контроля над выполнением в цикле событий. Чтобы лучше понять, как это работает, нам нужно рассмотреть более сложный пример кода.
Допустим, мы хотим создать две сопрограммы, которые будут выполнять некоторые простые задачи в цикле:
При выполнении в терминале (с помощью команды времени для измерения времени) можно увидеть:
Как мы видим, обе сопрограммы завершили свое выполнение, но не асинхронно. Причина в том, что они обе используют функцию time.sleep (), которая блокирует, но не освобождает элемент управления в цикле событий. Это будет работать лучше в многопоточной установке, но мы не хотим сейчас использовать потоки. Итак, как мы можем это исправить?
Ответ заключается в том, чтобы использовать asyncio.sleep (), которая является асинхронной версией time.sleep (), и ожидать результата с помощью ключевого слова await. Мы уже использовали это утверждение в первой версии функции main (), но это было только для улучшения ясности кода. Это явно не делало нашу реализацию более параллельной. Давайте посмотрим на улучшенную версию сопрограммы waiter (), которая использует await asyncio.sleep():
Запустив обновленный скрипт, мы увидим, как выходные данные двух функций чередуются друг с другом:
Дополнительным преимуществом этого простого улучшения является то, что код работает быстрее. Общее время выполнения было меньше, чем сумма всех времен сна, потому что сопрограммы поочередно освобождали контроль.
Модуль asyncio появился в Python 3.4. Так что это единственная версия Python, которая имеет серьезную поддержку асинхронного программирования до Python 3.5. К сожалению, похоже, что этих двух последующих версий достаточно, чтобы представить проблемы совместимости.
Как ни крути, ядро асинхронного программирования в Python было введено раньше, чем элементы синтаксиса, поддерживающие этот шаблон. Лучше поздно, чем никогда, но это создало ситуацию, когда есть два синтаксиса для работы с сопрограммами.
Начиная с Python 3.5, вы можете использовать async и await:
Однако в Python 3.4, прийдется дополнительно применить asyncio.coroutine декоратор и yield в тексте сопрограммы:
Другим полезным фактом является то, что оператор yield from был введен в Python 3.3, а в PyPI имеется асинхронный бэкпорт. Это означает, что вы также можете использовать эту реализацию совместной многозадачности с Python 3.3.
Как уже неоднократно упоминалось в этой главе, асинхронное программирование является отличным инструментом для обработки операций ввода-вывода. Настало время создать что-то более практичное, чем простая печать последовательностей или асинхронное ожидание.
В целях обеспечения согласованности мы попытаемся решить ту же проблему, которую решили с помощью многопоточности и многопроцессорности. Поэтому мы попытаемся асинхронно извлечь некоторые данные из внешних ресурсов через сетевое соединение. Было бы здорово, если бы мы могли использовать тот же пакет python-gmaps, что и в предыдущих разделах. К сожалению, мы не можем.
Создатель python-gmaps был немного ленив и взял лишь название. Чтобы упростить разработку, он выбрал пакет запросов в качестве своей клиентской библиотеки HTTP. К сожалению, запросы не поддерживают асинхронный ввод-вывод с async и await. Есть некоторые другие проекты, которые нацелены на обеспечение некоторого параллелизма для проекта запросов, но они либо полагаются на Gevent (grequests, см. Https://github.com/ kennethreitz / grequests), либо на выполнение пула потоков / процессов (запросы-futures, обратитесь к github.com/ross/requests-futures). Ни один из них не решает нашу проблему.
Давайте предположим, что этот код хранится в модуле с именем asyncgmaps, который мы будем использовать позже. Теперь мы готовы переписать пример, используемый при обсуждении многопоточности и многопроцессорности. Ранее мы использовали для разделения всей операции на два отдельных этапа:
Но поскольку совместная многозадачность совершенно отличается от использования нескольких процессов или потоков, мы можем немного изменить наш подход. Большинство проблем, поднятых в разделе Использование одного потока на элемент, больше не являются нашей заботой.
Сопрограммы не являются вытесняющими, поэтому мы можем легко отображать результаты сразу после получения ответов HTTP. Это упростит наш код и сделает его более понятным:
Асинхронное программирование отлично подходит для бэкэнд-разработчиков, заинтересованных в создании масштабируемых приложений. На практике это один из наиболее важных инструментов для создания высококонкурентных серверов.
Но реальность печальна. Многие популярные пакеты, которые имеют дело с проблемами ввода-вывода, не предназначены для использования с асинхронным кодом. Основными причинами этого являются:
Это означает, что очень часто миграция существующих синхронных многопоточных приложений и пакетов либо невозможна (из-за архитектурных ограничений), либо слишком дорога. Многие проекты могли бы извлечь большую пользу от внедрения асинхронного стиля многозадачности, но только некоторые из них в конечном итоге сделают это. Это означает, что прямо сейчас вы будете испытывать много трудностей при попытке создать асинхронные приложения с самого начала. В большинстве случаев это будет похоже на проблему, упомянутую в разделе «Практический пример асинхронного программирования» — несовместимые интерфейсы и несинхронная блокировка операций ввода-вывода. Конечно, иногда вы можете отказаться от ожидания, когда вы испытываете такую несовместимость, и просто синхронно получать необходимые ресурсы. Но это будет блокировать выполнение каждой другой сопрограммой своего кода, пока вы ждете результатов. Технически это работает, но также разрушает все преимущества асинхронного программирования. Таким образом, в конце концов, объединение асинхронного ввода-вывода с синхронным вводом-выводом не вариант. Это игра типа «все или ничего».
Другая проблема — длительные операции с привязкой к процессору. Когда вы выполняете операцию ввода-вывода, не проблема выпустить управление из сопрограммы. При записи / чтении из файловой системы или сокета вы, в конце концов, будете ждать, так что вызов с использованием await — лучшее, что вы можете сделать. Но что делать, если вам нужно что-то вычислить, и вы знаете, что это займет некоторое время? Конечно, вы можете разделить проблему на части и отменить контроль каждый раз, когда вы немного продвигаете работу. Но вскоре вы обнаружите, что это не очень хорошая модель. Такая вещь может сделать код беспорядочным, а также не гарантирует хорошие результаты.
Временная привязка должна быть ответственностью интерпретатора или операционной системы.
Так что делать, если у вас есть код, который выполняет длинные синхронные операции ввода-вывода, которые вы не можете или не хотите переписывать. Или что делать, когда вам приходится выполнять некоторые тяжелые операции с процессором в приложении, разработанном в основном с учетом асинхронного ввода-вывода? Ну… тебе нужно найти обходной путь. И под этим я подразумеваю многопоточность или многопроцессорность.
Это может звучать не очень хорошо, но иногда лучшим решением может быть то, от чего мы пытались убежать. Параллельная обработка ресурсоемких задач в Python всегда выполняется лучше благодаря многопроцессорности. И многопоточность может справляться с операциями ввода-вывода одинаково хорошо (быстро и без больших затрат ресурсов), как асинхронное и ожидающее, если правильно настроена и обрабатывается с осторожностью.
Поэтому иногда, когда вы не знаете, что делать, когда что-то просто не подходит вашему асинхронному приложению, используйте фрагмент кода, который откладывает его на отдельный поток или процесс. Вы можете сделать вид, что это сопрограмма, освободить управление для цикла событий и в конечном итоге обработать результаты, когда они будут готовы.
К счастью для нас, стандартная библиотека Python предоставляет модуль concurrent.futures, который также интегрирован с модулем asyncio. Эти два модуля вместе позволяют вам планировать функции блокировки, выполняемые в потоках или дополнительных процессах, как если бы это были асинхронные неблокирующие сопрограммы.
Прежде чем мы увидим, как внедрить потоки или процессы в асинхронный цикл обработки событий, мы подробнее рассмотрим модуль concurrent.futures, который позже станет основным компонентом нашего так называемого обходного пути.
Наиболее важными классами в модуле concurrent.futures являются Executor и Future.
Executor представляет собой пул ресурсов, которые могут обрабатывать рабочие элементы параллельно. Это может показаться очень похожим по назначению на классы из многопроцессорного модуля — Pool и dummy.Pool — но имеет совершенно другой интерфейс и семантику. Это базовый класс, не предназначенный для реализации и имеющий две конкретные реализации:
Каждый executor представляет три метода:
Наиболее интересен метод — submit () из-за объекта Future, который он возвращает. Он представляет асинхронное выполнение вызываемого и только косвенно представляет его результат. Чтобы получить фактическое возвращаемое значение отправляемого вызываемого объекта, необходимо вызвать метод Future.result (). И если вызываемый объект уже завершен, метод result () не заблокирует его и просто вернет выходные данные функции. Если это не так, он заблокирует его, пока результат не будет готов. Относитесь к нему как к обещанию результата (на самом деле это та же концепция, что и обещание в JavaScript). Вам не нужно распаковывать его сразу же после его получения (с помощью метода result ()), но если вы попытаетесь это сделать, это гарантированно в конечном итоге что-то вернет:
Если вы хотите использовать метод Executor.map (), он не отличается по использованию от метода Pool.map () класса Pool многопроцессорного модуля:
Экземпляры класса Future, возвращаемые методом Executor.submit (), концептуально очень близки к сопрограммам, используемым в асинхронном программировании. Вот почему мы можем использовать исполнителей, чтобы сделать гибрид между совместной многозадачностью и многопроцессорностью или многопоточностью.
Ядром этого обходного пути является метод BaseEventLoop.run_in_executor (executor, func, * args) класса цикла событий. Это позволяет планировать выполнение функции func в процессе или пуле потоков, представленном аргументом executor. Самым важным в этом методе является то, что он возвращает новый ожидаемый объект (объект, который можно ожидать с помощью оператора await). Таким образом, благодаря этому вы можете выполнить блокирующую функцию, которая не является сопрограммой в точности как сопрограмма, и она не будет блокировать, независимо от того, сколько времени потребуется, чтобы закончить. Он остановит только функцию, ожидающую результатов от такого вызова, но весь цикл событий будет продолжаться.
И полезным фактом является то, что вам даже не нужно создавать свой экземпляр executor. Если вы передадите None в качестве аргумента executor, класс ThreadPoolExecutor будет использоваться с числом потоков по умолчанию (для Python 3.5 это число процессоров, умноженное на 5).
Итак, давайте предположим, что мы не хотели переписывать проблемную часть пакета python-gmaps, которая была причиной нашей головной боли. Мы можем легко отложить блокирующий вызов до отдельного потока с помощью вызова loop.run_in_executor (), при этом оставляя функцию fetch_place () в качестве ожидаемой сопрограммы:
Такое решение хуже, чем наличие полностью асинхронной библиотеки для выполнения этой работы, но вы знаете, что хоть что-то лучше, чем ничего.
После объяснения того, что такое параллелизм на самом деле, мы приступили к действиям и проанализировали одну из типичных параллельных проблем с помощью многопоточности. Выявив основные недостатки нашего кода и исправив их, мы обратились к многопроцессорной обработке, чтобы посмотреть, как она будет работать в нашем случае.
После этого мы обнаружили, что с многопроцессорным модулем использовать несколько процессов намного проще, чем базовые потоки с многопоточностью. Но только после этого мы поняли, что можем использовать один и тот же API с потоками, благодаря multiprocessing.dummy. Таким образом, выбор между многопроцессорностью и многопоточностью теперь зависит только от того, какое решение лучше соответствует проблеме, а не какое решение имеет лучший интерфейс.
Говоря о подгонке проблемы, мы наконец-то попробовали асинхронное программирование, которое должно быть лучшим решением для приложений, связанных с вводом / выводом, только чтобы понять, что мы не можем полностью забыть о потоках и процессах. Таким образом, мы сделали круг, обратно в то место, где мы начали!
И это приводит нас к окончательному выводу этой главы. Нет решения, устраивающего всех. Есть несколько подходов, которые вы можете предпочесть или любить больше. Есть некоторые подходы, которые лучше подходят для данного набора проблем, но вам нужно знать их все, чтобы быть успешным. В реалистичных сценариях вы можете использовать весь арсенал инструментов и стилей параллелизма в одном приложении, и это не редкость.
Предыдущий вывод — отличное введение в тему следующей главы, Глава 14 «Полезные шаблоны проектирования». Так как нет единого шаблона, который решит все ваши проблемы. Вы должны знать как можно больше, потому что в конечном итоге вы будете использовать их каждый день.
из книги ‘Expert Python Programming’,
Second Edition
Michał Jaworski & Tarek Ziadé, 2016
Асинхронное программирование
В последние годы асинхронное программирование приобрело большую популярность. Python 3.5 наконец-то получил некоторые синтаксические функции, закрепляющие концепции асинхронных решений. Но это не значит, что асинхронное программирование стало возможным только начиная с Python 3.5. Многие библиотеки и фреймворки были предоставлены намного раньше, и большинство из них имеют происхождение в старых версиях Python 2. Существует даже целая альтернативная реализация Python, называемая Stackless (см. Главу 1 «Текущее состояние Python»), которая сосредоточена на этом едином подходе программирования. Для некоторых решений, таких как Twisted, Tornado или Eventlet, до сих пор существуют активные сообщества, и их действительно стоит знать. В любом случае, начиная с Python 3.5, асинхронное программирование стало проще, чем когда-либо прежде. Таким образом, ожидается, что его встроенные асинхронные функции заменят большую часть старых инструментов, или внешние проекты постепенно превратятся в своего рода высокоуровневые фреймворки, основанные на встроенных в Python.
При попытке объяснить, что такое асинхронное программирование, проще всего думать об этом подходе как о чем-то похожем на потоки, но без системного планированщика. Это означает, что асинхронная программа может одновременно обрабатывать задачи, но ее контекст переключается внутри, а не системным планировщиком.
Но, конечно, мы не используем потоки для параллельной обработки задач в асинхронной программе. Большинство решений используют разные концепции и, в зависимости от реализации, называются по-разному. Некоторые примеры имен, используемых для описания таких параллельных программных объектов:
- Green threads — Зеленые потоки (greenlet, gevent или eventlet проекты)
- Coroutines — сопрограммы (чистое асинхронное программирование на Python 3.5)
- Tasklets (Stackless Python) Это, в основном, те же понятия, но часто реализуемые немного по-другому.
По понятным причинам в этом разделе мы сосредоточимся только на сопрограммах, которые изначально поддерживаются Python, начиная с версии 3.5.
Совместная многозадачность и асинхронный ввод / вывод
Совместная многозадачность является ядром асинхронного программирования. В этом смысле многозадачность в операционной системе не обязана инициировать переключение контекста (другому процессу или потоку), а вместо этого каждый процесс добровольно освобождает управление, когда находится в режиме ожидания, чтобы обеспечить одновременное выполнение нескольких программ. Вот почему это называется совместным. Все процессы должны сотрудничать для того, чтобы многозадачность осуществлялась успешно.
Модель многозадачности иногда использовалась в операционных системах, но сейчас ее вряд ли можно найти в качестве решения системного уровня. Это связано с тем, что существует риск того, что одна плохо спроектированная служба может легко нарушить стабильность всей системы. Планирование потоков и процессов с помощью переключателей контекста, управляемых непосредственно операционной системой, в настоящее время является доминирующим подходом для параллелизма на системном уровне. Но совместная многозадачность все еще является отличным инструментом параллелизма на уровне приложений.
Говоря о совместной многозадачности на уровне приложений, мы не имеем дело с потоками или процессами, которым необходимо освободить управление, поскольку все выполнение содержится в одном процессе и потоке. Вместо этого у нас есть несколько задач (сопрограммы, tasklets и зеленые потоки), которые передают управление одной функции, управляющей координацией задач. Эта функция обычно является своего рода циклом событий.
Чтобы избежать путаницы (из-за терминологии Python), теперь мы будем называть такие параллельные задачи сопрограммами. Самая важная проблема в совместной многозадачности — когда передать контроль. В большинстве асинхронных приложений управление передается планировщику или циклу событий при операциях ввода-вывода. Независимо от того, читает ли программа данные из файловой системы или осуществляет связь через сокет, такая операция ввода-вывода всегда связана с некоторым временем ожидания, когда процесс становится бездействующим. Время ожидания зависит от внешнего ресурса, поэтому это хорошая возможность освободить управление, чтобы другие сопрограммы могли выполнять свою работу, пока им тоже не придется ждать, что такой подход несколько похожим по поведению на то, как многопоточность реализована в Python. Мы знаем, что GIL сериализует потоки Python, но он также освобождается при каждой операции ввода-вывода. Основное различие заключается в том, что потоки в Python реализованы как потоки системного уровня, поэтому операционная система может в любой момент выгрузить текущий запущенный поток и передать управление другому.
В асинхронном программировании задачи никогда не прерываются главным циклом событий. Вот почему этот стиль многозадачности также называется не приоритетной многозадачностью.
Конечно, каждое приложение Python работает в операционной системе, где есть другие процессы, конкурирующие за ресурсы. Это означает, что операционная система всегда имеет право выгрузить весь процесс и передать управление другому. Но когда наше асинхронное приложение запускается обратно, оно продолжается с того же места, где оно было приостановлено, когда системный планировщик вмешался. Именно поэтому сопрограммы в данном контексте считаются не вытесняющими.
Ключевые слова async и await в Python
Ключевые слова async и await являются основными строительными блоками в асинхронном программировании Python.
Ключевое слово async, используемое перед оператором def, определяет новую сопрограмму. Выполнение функции сопрограммы может быть приостановлено и возобновлено в строго определенных обстоятельствах. Его синтаксис и поведение очень похожи на генераторы (см. Главу 2 «Рекомендации по синтаксису» ниже уровня класса). Фактически, генераторы должны использоваться в более старых версиях Python для реализации сопрограмм. Вот пример объявления функции, которая использует ключевое слово async:
async def async_hello():
print("hello, world!")
Функции, определенные с помощью ключевого слова async, являются специальными. При вызове они не выполняют код внутри, а вместо этого возвращают объект сопрограммы:
>>>> async def async_hello():
... print("hello, world!")
...
>>> async_hello()
<coroutine object async_hello at 0x1014129e8>
Объект сопрограммы (coroutine object) ничего не делает, пока его выполнение не запланировано в цикле событий. Модуль asyncio доступен для предоставления базовой реализации цикла событий, а также множества других асинхронных утилит:
>>> import asyncio
>>> async def async_hello():
... print("hello, world!")
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(async_hello())
hello, world!
>>> loop.close()
Естественно, создавая только одну простую сопрограмму, в нашей программе мы не осуществляем параллелизма. Чтобы увидеть что-то действительно параллельное, нам нужно создать больше задач, которые будут выполняться циклом событий.
Новые задачи можно добавить в цикл, вызвав метод loop.create_task () или предоставив другой объект для ожидания использования функции asyncio.wait (). Мы будем использовать последний подход и попытаемся асинхронно напечатать последовательность чисел, сгенерированных с помощью функции range ():
import asyncio
async def print_number(number):
print(number)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(
asyncio.wait([
print_number(number)
for number in range(10)
])
)
loop.close()
Функция asyncio.wait () принимает список объектов сопрограмм и немедленно возвращается. Результатом является генератор, который выдает объекты, представляющие будущие результаты (фьючерсы). Как следует из названия, он используется для ожидания завершения всех предоставленных сопрограмм. Причина, по которой он возвращает генератор вместо объекта сопрограммы, заключается в обратной совместимости с предыдущими версиями Python, что будет объяснено позже. Результат выполнения этого скрипта может быть следующим:
$ python asyncprint.py
0
7
8
3 9 4 1 5 2 6
Как мы видим, числа печатаются не в том порядке, в котором мы создали наши сопрограммы. Но это именно то, чего мы хотели достичь.
Второе важное ключевое слово, добавленное в Python 3.5, await. Он используется для ожидания результатов сопрограммы или будущего события (поясняется позже) и освобождения контроля над выполнением в цикле событий. Чтобы лучше понять, как это работает, нам нужно рассмотреть более сложный пример кода.
Допустим, мы хотим создать две сопрограммы, которые будут выполнять некоторые простые задачи в цикле:
- Подождать случайное количество секунд
- Распечатать некоторый текст, предоставленный в качестве аргумента, и количество времени, проведенного в ожидании. Начнем с простой реализации, в которой есть некоторые проблемы параллелизма, которые мы позже попытаемся улучшить с помощью дополнительного использования await:
import time import random import asyncio async def waiter(name): for _ in range(4): time_to_sleep = random.randint(1, 3) / 4 time.sleep(time_to_sleep) print( "{} waited {} seconds" "".format(name, time_to_sleep) ) async def main(): await asyncio.wait([waiter("foo"), waiter("bar")]) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()
При выполнении в терминале (с помощью команды времени для измерения времени) можно увидеть:
$ time python corowait.py
bar waited 0.25 seconds
bar waited 0.25 seconds
bar waited 0.5 seconds
bar waited 0.5 seconds
foo waited 0.75 seconds
foo waited 0.75 seconds
foo waited 0.25 seconds
foo waited 0.25 seconds
real 0m3.734s user 0m0.153s sys 0m0.028s
Как мы видим, обе сопрограммы завершили свое выполнение, но не асинхронно. Причина в том, что они обе используют функцию time.sleep (), которая блокирует, но не освобождает элемент управления в цикле событий. Это будет работать лучше в многопоточной установке, но мы не хотим сейчас использовать потоки. Итак, как мы можем это исправить?
Ответ заключается в том, чтобы использовать asyncio.sleep (), которая является асинхронной версией time.sleep (), и ожидать результата с помощью ключевого слова await. Мы уже использовали это утверждение в первой версии функции main (), но это было только для улучшения ясности кода. Это явно не делало нашу реализацию более параллельной. Давайте посмотрим на улучшенную версию сопрограммы waiter (), которая использует await asyncio.sleep():
async def waiter(name):
for _ in range(4):
time_to_sleep = random.randint(1, 3) / 4
await asyncio.sleep(time_to_sleep)
print(
"{} waited {} seconds"
"".format(name, time_to_sleep)
)
Запустив обновленный скрипт, мы увидим, как выходные данные двух функций чередуются друг с другом:
$ time python corowait_improved.py
bar waited 0.25 seconds
foo waited 0.25 seconds
bar waited 0.25 seconds
foo waited 0.5 seconds
foo waited 0.25 seconds
bar waited 0.75 seconds
foo waited 0.25 seconds
bar waited 0.5 seconds
real 0m1.953s
user 0m0.149s
sys 0m0.026s
Дополнительным преимуществом этого простого улучшения является то, что код работает быстрее. Общее время выполнения было меньше, чем сумма всех времен сна, потому что сопрограммы поочередно освобождали контроль.
Asyncio в предыдущих версиях Python
Модуль asyncio появился в Python 3.4. Так что это единственная версия Python, которая имеет серьезную поддержку асинхронного программирования до Python 3.5. К сожалению, похоже, что этих двух последующих версий достаточно, чтобы представить проблемы совместимости.
Как ни крути, ядро асинхронного программирования в Python было введено раньше, чем элементы синтаксиса, поддерживающие этот шаблон. Лучше поздно, чем никогда, но это создало ситуацию, когда есть два синтаксиса для работы с сопрограммами.
Начиная с Python 3.5, вы можете использовать async и await:
async def main ():
await asyncio.sleep(0)
Однако в Python 3.4, прийдется дополнительно применить asyncio.coroutine декоратор и yield в тексте сопрограммы:
@asyncio.couroutine
def main():
yield from asyncio.sleep(0)
Другим полезным фактом является то, что оператор yield from был введен в Python 3.3, а в PyPI имеется асинхронный бэкпорт. Это означает, что вы также можете использовать эту реализацию совместной многозадачности с Python 3.3.
Практический пример асинхронного программирования
Как уже неоднократно упоминалось в этой главе, асинхронное программирование является отличным инструментом для обработки операций ввода-вывода. Настало время создать что-то более практичное, чем простая печать последовательностей или асинхронное ожидание.
В целях обеспечения согласованности мы попытаемся решить ту же проблему, которую решили с помощью многопоточности и многопроцессорности. Поэтому мы попытаемся асинхронно извлечь некоторые данные из внешних ресурсов через сетевое соединение. Было бы здорово, если бы мы могли использовать тот же пакет python-gmaps, что и в предыдущих разделах. К сожалению, мы не можем.
Создатель python-gmaps был немного ленив и взял лишь название. Чтобы упростить разработку, он выбрал пакет запросов в качестве своей клиентской библиотеки HTTP. К сожалению, запросы не поддерживают асинхронный ввод-вывод с async и await. Есть некоторые другие проекты, которые нацелены на обеспечение некоторого параллелизма для проекта запросов, но они либо полагаются на Gevent (grequests, см. Https://github.com/ kennethreitz / grequests), либо на выполнение пула потоков / процессов (запросы-futures, обратитесь к github.com/ross/requests-futures). Ни один из них не решает нашу проблему.
Прежде чем уперкать, что я ругаю невинного разработчика с открытым исходным кодом, успокойся. Человек, стоящий за пакетом python-gmaps, это я. Плохой выбор зависимостей является одной из проблем этого проекта. Мне просто нравится время от времени публично критиковать себя. Для меня это будет горьким уроком, так как python-gmaps в его последней версии (0.3.1 на момент написания этой книги) не может быть легко интегрирован с асинхронным вводом-выводом Python. В любом случае, это может измениться в будущем, поэтому ничего не потеряно.Зная об ограничениях библиотеки, которую было так легко использовать в предыдущих примерах, нам нужно создать что-то, что заполнит этот пробел. Google MapsAPI действительно прост в использовании, поэтому мы создадим на скорую руку асинхронную утилиту только для иллюстрации. В стандартной библиотеке Python версии 3.5 по-прежнему отсутствует библиотека, которая бы выполняла асинхронные HTTP-запросы так же просто, как вызов urllib.urlopen (). Мы определенно не хотим создавать полную поддержку протокола с нуля, поэтому мы будем использовать небольшую справку из пакета aiohttp, доступного в PyPI. Это действительно многообещающая библиотека, которая добавляет как клиентские, так и серверные реализации для асинхронного HTTP. Вот небольшой модуль, построенный поверх aiohttp, который создает одну вспомогательную функцию geocode (), которая выполняет запросы геокодирования к службе Google Maps API:
import aiohttp
session = aiohttp.ClientSession()
async def geocode(place):
params = {
'sensor': 'false',
'address': place
}
async with session.get(
'https://maps.googleapis.com/maps/api/geocode/json',
params=params
) as response:
result = await response.json()
return result['results']
Давайте предположим, что этот код хранится в модуле с именем asyncgmaps, который мы будем использовать позже. Теперь мы готовы переписать пример, используемый при обсуждении многопоточности и многопроцессорности. Ранее мы использовали для разделения всей операции на два отдельных этапа:
- Выполните все запросы к внешней службе параллельно, используя функцию fetch_place ().
- Отобразите все результаты в цикле, используя функцию present_result ().
Но поскольку совместная многозадачность совершенно отличается от использования нескольких процессов или потоков, мы можем немного изменить наш подход. Большинство проблем, поднятых в разделе Использование одного потока на элемент, больше не являются нашей заботой.
Сопрограммы не являются вытесняющими, поэтому мы можем легко отображать результаты сразу после получения ответов HTTP. Это упростит наш код и сделает его более понятным:
import asyncio
# note: local module introduced earlier
from asyncgmaps import geocode, session
PLACES = (
'Reykjavik', 'Vien', 'Zadar', 'Venice',
'Wrocław', 'Bolognia', 'Berlin', 'Słubice',
'New York', 'Dehli',
)
async def fetch_place(place):
return (await geocode(place))[0]
async def present_result(result):
geocoded = await result
print("{:>25s}, {:6.2f}, {:6.2f}".format(
geocoded['formatted_address'],
geocoded['geometry']['location']['lat'],
geocoded['geometry']['location']['lng'],
))
async def main():
await asyncio.wait([
present_result(fetch_place(place))
for place in PLACES
])
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# aiohttp will raise issue about unclosed
# ClientSession so we perform cleanup manually
loop.run_until_complete(session.close())
loop.close()
Асинхронное программирование отлично подходит для бэкэнд-разработчиков, заинтересованных в создании масштабируемых приложений. На практике это один из наиболее важных инструментов для создания высококонкурентных серверов.
Но реальность печальна. Многие популярные пакеты, которые имеют дело с проблемами ввода-вывода, не предназначены для использования с асинхронным кодом. Основными причинами этого являются:
- Все еще низкое внедрение Python 3 и некоторых его расширенных возможностей
- Низкое понимание различных концепций параллелизма среди начинающих изучать Python
Это означает, что очень часто миграция существующих синхронных многопоточных приложений и пакетов либо невозможна (из-за архитектурных ограничений), либо слишком дорога. Многие проекты могли бы извлечь большую пользу от внедрения асинхронного стиля многозадачности, но только некоторые из них в конечном итоге сделают это. Это означает, что прямо сейчас вы будете испытывать много трудностей при попытке создать асинхронные приложения с самого начала. В большинстве случаев это будет похоже на проблему, упомянутую в разделе «Практический пример асинхронного программирования» — несовместимые интерфейсы и несинхронная блокировка операций ввода-вывода. Конечно, иногда вы можете отказаться от ожидания, когда вы испытываете такую несовместимость, и просто синхронно получать необходимые ресурсы. Но это будет блокировать выполнение каждой другой сопрограммой своего кода, пока вы ждете результатов. Технически это работает, но также разрушает все преимущества асинхронного программирования. Таким образом, в конце концов, объединение асинхронного ввода-вывода с синхронным вводом-выводом не вариант. Это игра типа «все или ничего».
Другая проблема — длительные операции с привязкой к процессору. Когда вы выполняете операцию ввода-вывода, не проблема выпустить управление из сопрограммы. При записи / чтении из файловой системы или сокета вы, в конце концов, будете ждать, так что вызов с использованием await — лучшее, что вы можете сделать. Но что делать, если вам нужно что-то вычислить, и вы знаете, что это займет некоторое время? Конечно, вы можете разделить проблему на части и отменить контроль каждый раз, когда вы немного продвигаете работу. Но вскоре вы обнаружите, что это не очень хорошая модель. Такая вещь может сделать код беспорядочным, а также не гарантирует хорошие результаты.
Временная привязка должна быть ответственностью интерпретатора или операционной системы.
Объединение асинхронного кода с асинхронным использованием фьючерсов
Так что делать, если у вас есть код, который выполняет длинные синхронные операции ввода-вывода, которые вы не можете или не хотите переписывать. Или что делать, когда вам приходится выполнять некоторые тяжелые операции с процессором в приложении, разработанном в основном с учетом асинхронного ввода-вывода? Ну… тебе нужно найти обходной путь. И под этим я подразумеваю многопоточность или многопроцессорность.
Это может звучать не очень хорошо, но иногда лучшим решением может быть то, от чего мы пытались убежать. Параллельная обработка ресурсоемких задач в Python всегда выполняется лучше благодаря многопроцессорности. И многопоточность может справляться с операциями ввода-вывода одинаково хорошо (быстро и без больших затрат ресурсов), как асинхронное и ожидающее, если правильно настроена и обрабатывается с осторожностью.
Поэтому иногда, когда вы не знаете, что делать, когда что-то просто не подходит вашему асинхронному приложению, используйте фрагмент кода, который откладывает его на отдельный поток или процесс. Вы можете сделать вид, что это сопрограмма, освободить управление для цикла событий и в конечном итоге обработать результаты, когда они будут готовы.
К счастью для нас, стандартная библиотека Python предоставляет модуль concurrent.futures, который также интегрирован с модулем asyncio. Эти два модуля вместе позволяют вам планировать функции блокировки, выполняемые в потоках или дополнительных процессах, как если бы это были асинхронные неблокирующие сопрограммы.
Исполнители (executors) и фьючерсы (futures)
Прежде чем мы увидим, как внедрить потоки или процессы в асинхронный цикл обработки событий, мы подробнее рассмотрим модуль concurrent.futures, который позже станет основным компонентом нашего так называемого обходного пути.
Наиболее важными классами в модуле concurrent.futures являются Executor и Future.
Executor представляет собой пул ресурсов, которые могут обрабатывать рабочие элементы параллельно. Это может показаться очень похожим по назначению на классы из многопроцессорного модуля — Pool и dummy.Pool — но имеет совершенно другой интерфейс и семантику. Это базовый класс, не предназначенный для реализации и имеющий две конкретные реализации:
- ThreadPoolExecutor: который представляет пул потоков
- ProcessPoolExecutor: который представляет пул процессов
Каждый executor представляет три метода:
- submit (fn, * args, ** kwargs): планирует функцию fn для выполнения в пуле ресурсов и возвращает объект Future, представляющий выполнение вызываемого объекта
- map (func, * iterables, timeout = None, chunksize = 1): функция func выполняется над итерацией аналогично многопроцессорной обработке. Метод Pool.map ()
- shutdown (wait = True): это выключает Executor и освобождает все его ресурсы.
Наиболее интересен метод — submit () из-за объекта Future, который он возвращает. Он представляет асинхронное выполнение вызываемого и только косвенно представляет его результат. Чтобы получить фактическое возвращаемое значение отправляемого вызываемого объекта, необходимо вызвать метод Future.result (). И если вызываемый объект уже завершен, метод result () не заблокирует его и просто вернет выходные данные функции. Если это не так, он заблокирует его, пока результат не будет готов. Относитесь к нему как к обещанию результата (на самом деле это та же концепция, что и обещание в JavaScript). Вам не нужно распаковывать его сразу же после его получения (с помощью метода result ()), но если вы попытаетесь это сделать, это гарантированно в конечном итоге что-то вернет:
>>> def loudy_return():
... print("processing")
... return 42
...
>>> from concurrent.futures import ThreadPoolExecutor
>>> with ThreadPoolExecutor(1) as executor:
... future = executor.submit(loudy_return)
...
processing
>>> future
<Future at 0x33cbf98 state=finished returned int>
>>> future.result()
42
Если вы хотите использовать метод Executor.map (), он не отличается по использованию от метода Pool.map () класса Pool многопроцессорного модуля:
def main():
with ThreadPoolExecutor(POOL_SIZE) as pool:
results = pool.map(fetch_place, PLACES)
for result in results:
present_result(result)
Использование Executor в цикле событий
Экземпляры класса Future, возвращаемые методом Executor.submit (), концептуально очень близки к сопрограммам, используемым в асинхронном программировании. Вот почему мы можем использовать исполнителей, чтобы сделать гибрид между совместной многозадачностью и многопроцессорностью или многопоточностью.
Ядром этого обходного пути является метод BaseEventLoop.run_in_executor (executor, func, * args) класса цикла событий. Это позволяет планировать выполнение функции func в процессе или пуле потоков, представленном аргументом executor. Самым важным в этом методе является то, что он возвращает новый ожидаемый объект (объект, который можно ожидать с помощью оператора await). Таким образом, благодаря этому вы можете выполнить блокирующую функцию, которая не является сопрограммой в точности как сопрограмма, и она не будет блокировать, независимо от того, сколько времени потребуется, чтобы закончить. Он остановит только функцию, ожидающую результатов от такого вызова, но весь цикл событий будет продолжаться.
И полезным фактом является то, что вам даже не нужно создавать свой экземпляр executor. Если вы передадите None в качестве аргумента executor, класс ThreadPoolExecutor будет использоваться с числом потоков по умолчанию (для Python 3.5 это число процессоров, умноженное на 5).
Итак, давайте предположим, что мы не хотели переписывать проблемную часть пакета python-gmaps, которая была причиной нашей головной боли. Мы можем легко отложить блокирующий вызов до отдельного потока с помощью вызова loop.run_in_executor (), при этом оставляя функцию fetch_place () в качестве ожидаемой сопрограммы:
async def fetch_place(place):
coro = loop.run_in_executor(None, api.geocode, place)
result = await coro
return result[0]
Такое решение хуже, чем наличие полностью асинхронной библиотеки для выполнения этой работы, но вы знаете, что хоть что-то лучше, чем ничего.
После объяснения того, что такое параллелизм на самом деле, мы приступили к действиям и проанализировали одну из типичных параллельных проблем с помощью многопоточности. Выявив основные недостатки нашего кода и исправив их, мы обратились к многопроцессорной обработке, чтобы посмотреть, как она будет работать в нашем случае.
После этого мы обнаружили, что с многопроцессорным модулем использовать несколько процессов намного проще, чем базовые потоки с многопоточностью. Но только после этого мы поняли, что можем использовать один и тот же API с потоками, благодаря multiprocessing.dummy. Таким образом, выбор между многопроцессорностью и многопоточностью теперь зависит только от того, какое решение лучше соответствует проблеме, а не какое решение имеет лучший интерфейс.
Говоря о подгонке проблемы, мы наконец-то попробовали асинхронное программирование, которое должно быть лучшим решением для приложений, связанных с вводом / выводом, только чтобы понять, что мы не можем полностью забыть о потоках и процессах. Таким образом, мы сделали круг, обратно в то место, где мы начали!
И это приводит нас к окончательному выводу этой главы. Нет решения, устраивающего всех. Есть несколько подходов, которые вы можете предпочесть или любить больше. Есть некоторые подходы, которые лучше подходят для данного набора проблем, но вам нужно знать их все, чтобы быть успешным. В реалистичных сценариях вы можете использовать весь арсенал инструментов и стилей параллелизма в одном приложении, и это не редкость.
Предыдущий вывод — отличное введение в тему следующей главы, Глава 14 «Полезные шаблоны проектирования». Так как нет единого шаблона, который решит все ваши проблемы. Вы должны знать как можно больше, потому что в конечном итоге вы будете использовать их каждый день.