Асинхронный Python: различные формы конкурентности

Автор оригинала: Abu Ashraf Masnun
С появлением Python 3 довольно много шума об “асинхронности” и “параллелизме”, можно полагать, что Python недавно представил эти возможности/концепции. Но это не так. Мы много раз использовали эти операции. Кроме того, новички могут подумать, что asyncio является единственным или лучшим способом воссоздать и использовать асинхронные/параллельные операции. В этой статье мы рассмотрим различные способы достижения параллелизма, их преимущества и недостатки.

Определение терминов:


Прежде чем мы углубимся в технические аспекты, важно иметь некоторое базовое понимание терминов, часто используемых в этом контексте.

Синхронный и асинхронный:

В ​синхронных операциях задачи выполняются друг за другом. В асинхронных задачи могут запускаться и завершаться независимо друг от друга. Одна асинхронная задача может запускаться и продолжать выполняться, пока выполнение переходит к новой задаче. Асинхронные задачи ​не блокируют (не заставляют ждать завершения выполнения задачи) операции и обычно выполняются в фоновом режиме.

Например, вы должны обратиться в туристическое агентство, чтобы спланировать свой следующий отпуск. Вам нужно отправить письмо своему руководителю, прежде чем улететь. В синхронном режиме, вы сначала позвоните в туристическое агентство, и если вас попросят подождать, то вы будете ждать, пока вам не ответят. Затем вы начнёте писать письмо руководителю. Таким образом, вы выполняете задачи последовательно, одна за одной. [синхронное выполнение, прим. переводчика] Но, если вы умны, то пока вас попросили подождать [​повисеть на телефоне, прим. переводчика] вы начнёте писать e-mail и когда с вами снова заговорят вы приостановите написание, поговорите, а затем допишете письмо. Вы также можете попросить друга позвонить в агентство, а сами написать письмо. Это асинхронность, задачи не блокируют друг друга.

Конкурентность и параллелизм:

Конкурентность подразумевает, что две задачи выполняются совместно. В нашем предыдущем примере, когда мы рассматривали асинхронный пример, мы постепенно продвигались то в написании письма, то в разговоре с тур. агентством. Это ​конкурентность.

Когда мы попросили позвонить друга, а сами писали письмо, то задачи выполнялись ​параллельно.​

Параллелизм по сути является формой конкурентности. Но параллелизм зависит от оборудования. Например, если в CPU только одно ядро, то две задачи не могут выполняться параллельно. Они просто делят процессорное время между собой. Тогда это конкурентность, но не параллелизм. Но когда у нас есть несколько ядер [как друг в предыдущем примере, который является вторым ядром, прим. переводчика] мы можем выполнять несколько операций (в зависимости от количества ядер) одновременно.

Подытожим:

  • Синхронность: блокирует операции (блокирующие)
  • Асинхронность: не блокирует операции (неблокирующие)
  • Конкурентность: совместный прогресс (совместные)
  • Параллелизм: параллельный прогресс (параллельные)

Параллелизм подразумевает конкурентность. Но конкурентность не всегда подразумевает параллелизм.

Потоки и процессы


Python поддерживает потоки уже очень давно. Потоки позволяют выполнять операции конкурентно. Но есть проблема, связанная с Global Interpreter Lock (GIL) из-за которой потоки не могли обеспечить настоящий параллелизм. И тем не менее, с появлением multiprocessing можно использовать несколько ядер с помощью Python.

Потоки (Threads)

Рассмотрим небольшой пример. В нижеследующем коде функция worker будет выполняться в нескольких потоках асинхронно и одновременно.

import threading
import time
import random


def worker(number):
    sleep = random.randrange(1, 10)
    time.sleep(sleep)
    print("I am Worker {}, I slept for {} seconds".format(number, sleep))


for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    t.start()

print("All Threads are queued, let's see when they finish!")

А вот пример выходных данных:

$ python thread_test.py
All Threads are queued, let's see when they finish!
I am Worker 1, I slept for 1 seconds
I am Worker 3, I slept for 4 seconds
I am Worker 4, I slept for 5 seconds
I am Worker 2, I slept for 7 seconds
I am Worker 0, I slept for 9 seconds

Таким образом мы запустили 5 потоков для совместной работы и после их старта (т.е. после запуска функции worker) операция не ждёт завершения работы потоков прежде чем перейти к следующему оператору print. Это асинхронная операция.

В нашем примере мы передали функцию в конструктор Thread. Если бы мы хотели, то могли бы реализовать подкласс с методом (ООП стиль).

Дальнейшее чтение:

Чтобы узнать больше о потоках, воспользуйтесь ссылкой ниже:


Global Interpreter Lock (GIL)

GIL был представлен, чтобы сделать обработку памяти CPython проще и обеспечить наилучшую интеграцию с C(например, с расширениями). GIL — это механизм блокировки, когда интерпретатор Python запускает в работу только один поток за раз. Т.е. только один поток может исполняться в байт-коде Python единовременно. GIL следит за тем, чтобы несколько потоков не выполнялись параллельно.

Краткие сведения о GIL:

  • Одновременно может выполняться один поток.
  • Интерпретатор Python переключается между потоками для достижения конкурентности.
  • GIL применим к CPython (стандартной реализации). Но такие как, например, Jython и IronPython не имеют GIL.
  • GIL делает однопоточные программы быстрыми.
  • Операциям ввода/вывода GIL обычно не мешает.
  • GIL позволяет легко интегрировать непотокобезопасные библиотеки на C, благодаря GIL у нас есть много высокопроизводительных расширений/модулей, написанных на C.
  • Для CPU зависимых задач интерпретатор делает проверку каждые N тиков и переключает потоки. Таким образом один поток не блокирует другие.

Многие видят в GIL слабость. Я же рассматриваю это как благо, ведь были созданы такие библиотеки как NumPy, SciPy, которые занимают особое, уникальное положение в научном обществе.

Дальнейшее чтение:

Эти ресурсы позволят углубиться в GIL:


Процессы (Processes)

Чтобы достичь параллелизма в Python был добавлен модуль multiprocessing, который предоставляет API, и выглядит очень похожим, если вы использовали threading раньше.

Давайте просто пойдем и изменим предыдущий пример. Теперь модифицированная версия использует Процесс вместо Потока.

import multiprocessing
import time
import random


def worker(number):
    sleep = random.randrange(1, 10)
    time.sleep(sleep)
    print("I am Worker {}, I slept for {} seconds".format(number, sleep))


for i in range(5):
    t = multiprocessing.Process(target=worker, args=(i,))
    t.start()

print("All Processes are queued, let's see when they finish!")

Что же изменилось? Я просто импортировал модуль multiprocessing вместо threading. А затем, вместо потока я использовал процесс. Вот и всё! Теперь вместо множества потоков мы используем процессы которые запускаются на разных ядрах CPU (если, конечно, у вашего процессора несколько ядер).

С помощью класса Pool мы также можем распределить выполнение одной функции между несколькими процессами для разных входных значений. Пример из официальных документов:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))

Здесь вместо того, чтобы перебирать список значений и вызывать функцию f по одному, мы фактически запускаем функцию в разных процессах. Один процесс выполняет f(1), другой-f(2), а другой-f (3). Наконец, результаты снова объединяются в список. Это позволяет нам разбить тяжелые вычисления на более мелкие части и запускать их параллельно для более быстрого расчета.

Дальнейшее чтение:


Модуль concurrent.futures

Модуль concurrent.futures большой и позволяет писать асинхронный код очень легко. Мои любимчики ThreadPoolExecutor и ProcessPoolExecutor. Эти исполнители поддерживают пул потоков или процессов. Мы отправляем наши задачи в пул, и он запускает задачи в доступном потоке / процессе. Возвращается объект Future, который можно использовать для запроса и получения результата по завершении задачи.

А вот пример ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor
from time import sleep
 
def return_after_5_secs(message):
    sleep(5)
    return message
 
pool = ThreadPoolExecutor(3)
 
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print(future.result())

У меня есть статья о concurrent.futures masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html. Она может быть полезна при более глубоком изучении этого модуля.

Дальнейшее чтение:


Asyncio — что, как и почему?


У вас, вероятно, есть вопрос, который есть у многих людей в сообществе Python — что asyncio приносит нового? Зачем нужен был еще один способ асинхронного ввода-вывода? Разве у нас уже не было потоков и процессов? Давай посмотрим!

Зачем нам нужен asyncio?

Процессы очень дорогостоящие [с точки зрения потребления ресурсов, прим. переводчика] для создания. Поэтому для операций ввода/вывода в основном выбираются потоки. Мы знаем, что ввод-вывод зависит от внешних вещей — медленные диски или неприятные сетевые лаги делают ввод-вывод часто непредсказуемым. Теперь предположим, что мы используем потоки для операций ввода-вывода. 3 потока выполняют различные задачи ввода-вывода. Интерпретатор должен был бы переключаться между конкурентными потоками и давать каждому из них некоторое время по очереди. Назовем потоки — T1, T2 и T3. Три потока начали свою операцию ввода-вывода. T3 завершает его первым. T2 и T1 все еще ожидают ввода-вывода. Интерпретатор Python переключается на T1, но он все еще ждет. Хорошо, интерпретатор перемещается в T2, а тот все еще ждет, а затем перемещается в T3, который готов и выполняет код. Вы видите в этом проблему?

T3 был готов, но интерпретатор сначала переключился между T2 и T1 — это понесло расходы на переключение, которых мы могли бы избежать, если бы интерпретатор сначала переключился на T3, верно?

Что есть asynio?

Asyncio предоставляет нам цикл событий наряду с другими крутыми вещами. Цикл событий (event loop) отслеживает события ввода/вывода и переключает задачи, которые готовы и ждут операции ввода/вывода [цикл событий — программная конструкция, которая ожидает прибытия и производит рассылку событий или сообщений в программе, прим. переводчика].

Идея очень проста. Есть цикл обработки событий. И у нас есть функции, которые выполняют асинхронные операции ввода-вывода. Мы передаем свои функции циклу событий и просим его запустить их для нас. Цикл событий возвращает нам объект Future, словно обещание, что в будущем мы что-то получим. Мы держимся за обещание, время от времени проверяем, имеет ли оно значение (нам очень не терпится), и, наконец, когда значение получено, мы используем его в некоторых других операциях [т.е. мы послали запрос, нам сразу дали билет и сказали ждать, пока придёт результат. Мы периодически проверяем результат и как только он получен мы берем билет и по нему получаем значение, прим. переводчика].

Asyncio использует генераторы и корутины для остановки и возобновления задач. Прочитать детали вы можете здесь:


Как использовать asyncio?

Прежде чем мы начнём, давайте взглянем на пример:

import asyncio
import datetime
import random


async def my_sleep_func():
    await asyncio.sleep(random.randint(0, 5))


async def display_date(num, loop):
    end_time = loop.time() + 50.0
    while True:
        print("Loop: {} Time: {}".format(num, datetime.datetime.now()))
        if (loop.time() + 1.0) >= end_time:
            break
        await my_sleep_func()


loop = asyncio.get_event_loop()

asyncio.ensure_future(display_date(1, loop))
asyncio.ensure_future(display_date(2, loop))

loop.run_forever()

Обратите внимание, что синтаксис async/await предназначен только для Python 3.5 и выше. Пройдёмся по коду:

  • У нас есть асинхронная функция display_date, которая принимает число (в качестве идентификатора) и цикл обработки событий в качестве параметров.
  • Функция имеет бесконечный цикл, который прерывается через 50 секунд. Но за этот период, она неоднократно печатает время и делает паузу. Функция await может ожидать завершения выполнения других асинхронных функций (корутин).
  • Передаем функцию в цикл обработки событий (используя метод ensure_future).
  • Запускаем цикл событий.

Всякий раз, когда происходит вызов await, asyncio понимает, что функции, вероятно, потребуется некоторое время. Таким образом, он приостанавливает выполнение, начинает мониторинг любого связанного с ним события ввода-вывода и позволяет запускать задачи. Когда asyncio замечает, что приостановленный ввод-вывод функции готов, он возобновляет функцию.

Делаем правильный выбор


Только что мы прошлись по самым популярным формам конкурентности. Но остаётся вопрос — что следует выбрать? Это зависит от вариантов использования. Из моего опыта я склонен следовать этому псевдо-коду:

if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
       print("Use Threads")
else:
    print("Multi Processing")

  • CPU Bound => Multi Processing
  • I/O Bound, Fast I/O, Limited Number of Connections => Multi Threading
  • I/O Bound, Slow I/O, Many connections => Asyncio

[Прим. переводчика]

Поделиться публикацией

Похожие публикации

AdBlock похитил этот баннер, но баннеры не зубы — отрастут

Подробнее
Реклама

Комментарии 36

    0
    После прочтения сложилось впечатление, что потоки вообще не нужны. Есть ли причины по которым мне не стоит использовать asyncio для любых операций ввода/вывода?
      0
      asyncio выполняет все задачи на одном ядре. Вы не сможете запустить 10 GET запросов на 10 ядрах с помощью только цикла событий, вы обязаны создать потоки/процессы тем или иным способом для параллельного выполнения.
        0
        На сколько я понимаю, весь I/O выполняется ядром операционной системы параллельно. Event loop выступает посредником. Или нет? Если бы все было так как вы сказали, то было бы невозможно добиться одинаковой производительности async кода и потоков в питоне.
          0
          event loop выполняет все задачи в одном потоке, а значит на одном ядре процессора. Задача сама по себе может создать поток и задействовать больше ядер, но механизм event loop — однопоточный. Сравнивать async-код и потоки, вообще говоря, некорректно, это разные техники. В задачах где нужно много CPU и мало I\O — выбираем потоки, где много I\O и мало CPU — acync. Но в реальности обычно совмещают (создают потоки из async обработчиков, когда предстоит долгий CPU-расчет)
            +1
            В задачах где нужно много CPU и мало I\O — выбираем потоки
            А вы точно про Python говорите?
              0
              Питон — это язык, event loop — системный механизм, который можно использовать из любого языка (и из питона с помощью asyncio). Поэтому это и про питон тоже, да.
                0
                А как же GIL?
                  0
                  Есть много путей обхода GIL — не использовать CPython, создавать процессы вместо потоков, с питона 3.2 также пофиксили многое для многопоточки где микс CPU и IO тредов, но проблема есть да. Что касается event loop — то с ним у питона никаких проблем и ограничений нет.
                    0
                    А что не так с GIL? У меня встречный вопрос: как Вы думаете как выполняется блокирующее чтение файлов/сокетов в разных потоках, задействуя системные вызовы и libc? Участвует ли в этом GIL как примитив синхронизации? К примеру, 10 разных файлов в 10 разных потоках.
                      +1
                      У меня есть ощущение, что вы не совсем поняли мой посыл. С GIL все нормально. Лок отпускается во время операции ввода-вывода, не препятствуя выполнению других потоков. Своим вопросом я выразил недоумение по поводу использования потоков для CPU bound задач в питоне.
                        –1
                        Но ведь если сделать тот же ход (а именно нормально написать C extension, который будет отпускать GIL), то и вполне можно решать CPU bound задачи на питоне =) Понятное дело, что числодробилку на чистом CPython писать — довольно гиблое дело.
            +1
            Так потоки не помогут же в том чтоб загрузить больше одного ядра. Это можно сделать только с помощью процессов в питоне.
              –1
              Это справедливо только для CPython и то только для версий ниже 3.2.
                0
                Ну я думал, если не указано иного, то речь и идет о CPython.
                А как потоками можно загрузить несколько ядер одновременно, если GIL не дает выполняться больше чем одному потоку в один момент времени?
                  +1
                  GIL в определённый момент «отпускается» (например, ожидание ответа от сервера) и в этот момент может работать другой поток, т.е. задачи, что не требуют CPU. Но, насколько я знаю, потоки, всё равно, выделяются на одном ядре. Потому очень интересна информация, что потоки работают на нескольких ядрах
                    +1
                    А как ожидание ответа от сервера загружает CPU? Загружает же обработка ответа, а эту обработку можно производить только на одном ядре одновременно.
                      0
                      Я криво написал) Никак не загружает, загружает составление запроса, отправка и получение ответа. В промежутках GIL отпущен и может выполняться другой поток. Но, насколько я знаю, потоки выделяются в рамках одного ядра (но это не точно). Если есть где-то информация о том, что это не так — очень хочеться почитать/посмотреть
                      Ещё раз простите за кривизну изложения мысли
            –1
            Например asyncio не поддерживает файловый ввод-вывод.
            Он посложнее в разработке и отладке.
            А еще есть уйма полезных блокирующих библиотек.
              0
              Про файлы не знал, спасибо. Только это скорее операционные системы не поддерживают асинхронные операции с файлами.
                0
                Поддерживают все современные (epoll, kqueue, select0
                0
                С чего это asyncio не поддерживает файловый ввод-вывод?
                  0
                  Оно не работает с обычными файлами (те, что на диске).
                  Например, для POSIX select:
                  File descriptors associated with regular files shall always select true for ready to read, ready to write, and error conditions.
                  Так что реквестирую у вас работающий пример с чтением/записью обычного файла на диске через asyncio (без использования тред-пулов, конечно).
                    0

                    Можно использовать aio_read().

                      0
                      Т.е. вы хотите сказать, что asyicnio-вский aio_read() для дескриптора файла на диске — блокирующий?
                        0
                        что asyicnio-вский aio_read()
                        Т.е. вы хотите сказать, что asyncio использует aio? Не расскажете об этом поподробнее?
                          0
                          Сорян, имелся ввиду «asyncio-вский add_reader», попутал из-за коммента выше.
                            0
                            Нет, сам 'add_reader' не блокирует. Для select он просто сразу же (почти) вызовет коллбек, даже если реально данных для чтения с диска еще нету (шпиндель раскручивается). И вот уже при попытке чтения файла прочитается 0 байт.
                            import asyncio
                            import selectors
                            import os
                            
                            devnull = os.open("/dev/null", os.O_RDONLY)
                            
                            def reader():
                                data = os.read(devnull, 50)
                                print("Successfully read {} bytes from /dev/null".format(len(data)))
                            
                            loop = asyncio.SelectorEventLoop(selectors.SelectSelector())
                            loop.add_reader(devnull, reader, )
                            loop.run_forever()
                            


                            Скрипт бесконечно печатает «Successfully read 0 bytes from /dev/null».
                            С epoll будет ошибка при вызове 'add_reader'.
                              0
                              /dev/null вообще неудачный пример просто. Вот на kqueue (FreeBSD) ваш скрипт выкидывает эксепшн с /dev/null на любом селекторе, с обычным файлом все будет норм:

                              import asyncio
                              import selectors
                              import os
                              
                              path = "/etc/fstab"
                              f = os.open(path, os.O_RDONLY)
                              
                              def reader():
                                  data = os.read(f, 50)
                                  print("Successfully read {} bytes from {}".format(len(data), path))
                                  print("data: {}".format(data))
                              
                              loop = asyncio.SelectorEventLoop(selectors.KqueueSelector())
                              loop.add_reader(f, reader, )
                              loop.run_forever()
                              

                              Вывод:
                              Successfully read 50 bytes from /etc/fstab
                              data: b'# Device\t\tMountpoint\tFStype\tOptions\t\tDump\tPass#\n/d'
                              Successfully read 50 bytes from /etc/fstab
                              data: b'ev/ada0p2\t\tnone\tswap\tsw\t\t0\t0\n/dev/ada1p2 '
                              Successfully read 50 bytes from /etc/fstab
                              data: b' none swap sw 0 0\nlinpr'
                              Successfully read 50 bytes from /etc/fstab
                              data: b'oc /compat/linux/proc linprocfs rw 0 0\nfdesc /dev/'
                              Successfully read 50 bytes from /etc/fstab
                              data: b'fd fdescfs rw 0 0\ntmpfs /compat/linux/dev/shm'
                              Successfully read 24 bytes from /etc/fstab
                              data: b'\ttmpfs\trw,mode=1777\t0\t0\n'


                              Искренне полагал, что и epoll поддерживает, оказывается таки нет. Но все равно вывод — однозначно сказать да или нет нельзя, зависит от ОС.
                0
                Надо смотреть бенчмарки для каждого подхода и конкретной задачи. Могут быть неожиданности
                  0
                  Код на asyncio более легко писать и дебажить по сравнению с кодам на потоках. Так как разработчик сам определяет места, где происходит переключение контекста. Плюс потоки потребляют больше памяти чем корутины(можно конечно взять зеленые потоки, но это еще более сложно дебажить, чем просто потоки). Если же так важна производительность(при том что не понятно почему на потоках должно получаться значительно быстрее чем на asyncio), то может лучше выбрать не Пайтон? Пайтон все же больше про скорость разработки и понятный код.
                  +1

                  Не со всеми определениями автора я согласен.


                  В ​синхронных операциях задачи выполняются друг за другом.

                  В данном случае не совсем удачное слово "задачи". Задачи могут быть вполне себе асинхронными, например пул обработчиков http запосов, но каждый из них может работать синхронно, тоесть последовательно: обработал параметры запроса, подготовил данные, передал ответ.


                  В асинхронных задачи могут запускаться и завершаться независимо друг от друга.

                  Правильнее говорить: в программе прописаны критерии, когда стартует асинхронная задача, но не время. Асинхронная задача может вообще не вызываться, или вызываться очень часто и исполняться, как в синхронном, так и асинхронном режиме. Простой пример: программа выполняет какую — то долгоиграющую операцию, и способна реагировать на Ctrl+C, нажмет оператор на эти кнопки или нет, нажмет, но потом передумает, и позволит программе продолжаться, или прервет ее, это зависит от оператора, но не от программы.


                  Конкурентность подразумевает, что две задачи выполняются совместно

                  Конкурентность подразумевает, что 2 задачи совместно используют одни и те же данные. Способ исполнения самих задач здесь не столь важен.


                  Параллелизм по сути является формой конкурентности.

                  Конкурентный доступ к данным является следствием появления возможности для паралельного исполнения кода.


                  Но параллелизм зависит от оборудования. Например, если в CPU только одно ядро, то две задачи не могут выполняться параллельно.

                  Да, 95 форточки, со своей кооперативной многозадачностью устарели, но они были, и для своего времени весьма не плохо справлялись.


                  Python поддерживает потоки уже очень давно. Потоки позволяют выполнять операции конкурентно.

                  Ядерные потоки в питоне — это скорее повод для головной боли разрабов, поэтому их не особо и используют. Байткод питона не атомарен, сам питон использует потоки для своих нужд, на сколько я помню, и поэтому у питона есть GIL, чтобы разраб ненароком чего нибудь не уронил, в процессе работы. А так, да, потоки в питоне есть.


                  По поводу async/await и asyncio, это не что иное, как синтаксический сахар над реализацией кооперативной многозадачности на одном потоке исполнения. Если вы много читаете по сети, или с диска, да, вы можете это использовать, и получить профит по количеству обработанных задач, но, например, на вычислительных задачах выигрыша не будет.

                    0
                    Гринлеты как-то пропустить умудрились в обзорной статьею.
                      0
                      Я так понял, автор предпочитает asyncio гринлетам, и ещё использует только встроенные решения.
                      0
                      Для CPU зависимых задач интерпретатор делает проверку каждые N тиков и переключает потоки.


                      Насколько мне известно, с версии Python 3.2 используются не тики, а миллисекунды.
                        0
                        жаль, что автор статьи не упомянул Twisted
                          0
                          В статье же не рассматривались фреймворки и сторонние библиотеки.

                        Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                        Самое читаемое