Как стать автором
Поиск
Написать публикацию
Обновить
63.55
Skillfactory
Учим работать в IT на курсах и в магистратурах

Параллельные вычисления, конкурентность и асинхронное программирование в Python: обзор для начинающих

Уровень сложностиПростой
Время на прочтение12 мин
Количество просмотров9.5K

Однопоточные приложения на Python ограничены в производительности: они выполняют задачи последовательно и не используют преимущества многоядерных процессоров. Кроме того, такие программы не справляются с обработкой множества операций одновременно, особенно если речь идет о задачах, связанных с вводом-выводом, например сетевыми запросами или чтением файлов.

Производительность можно значительно улучшить, внедрив в код параллельные вычисления, конкурентность или асинхронное программирование. Для этого Python предлагает такие инструменты, как multiprocessing, threading и asyncio.

Multiprocessing, threading и asyncio: в чем разница?

Для начала вспомним, что такое потоки и процессы.

Процессы — это отдельные программы, каждая со своей памятью, которые могут работать параллельно на разных ядрах процессора.

Потоки — это части одного процесса, которые одновременно выполняют задачи и используют для этого общую память.

  • Модуль threading запускает потоки для конкурентной работы внутри одного процесса. 

  • Библиотека asyncio управляет задачами асинхронно, без дополнительных потоков или процессов. 

  • Модуль multiprocessing создает процессы для параллельного выполнения на нескольких ядрах.

Разберем, как эти инструменты работают и где используются.

Многопоточность и конкурентное выполнение задач

Модуль threading позволяет запускать несколько потоков внутри одного процесса для конкурентного выполнения задач. 

Конкурентность — это способность программы управлять несколькими задачами так, чтобы они выполнялись одновременно с точки зрения пользователя, даже если физически они чередуются. Например, пока одна задача ждет ответа от сервера, другая может обработать данные.

Потоки в threading работают схожим образом — они переключаются между задачами и разделяют общую память и ресурсы процесса.

Работа с threading

Основной инструмент модуля — класс Thread, который позволяет запускать функции в отдельных потоках. 

Базовый синтаксис:

import threading
 
 def task(name):
     print(f"Задача {name} выполняется в потоке {threading.current_thread().name}")
 
 t = threading.Thread(target=task, args=("A",))
 t.start()
 t.join()

Здесь метод start() запускает поток, а join() заставляет основной код ждать его завершения.

GIL и его влияние на многопоточность

В стандартной реализации Python (CPython) есть глобальная блокировка интерпретатора (GIL), которая ограничивает многопоточность. GIL позволяет только одному потоку выполняться в интерпретаторе в любой момент времени, даже на многоядерных процессорах. 

Это означает, что потоки в threading не могут параллельно использовать несколько ядер для вычислений. Поэтому они неэффективны для задач CPU-bound, где скорость зависит от процессора (например, сложных математических расчетов или обработки больших массивов данных).

А вот в задачах I/O-bound, где программа много времени ждет внешних операций, таких как сетевые запросы или чтение файлов, GIL не мешает. Потоки переключаются между собой, пока одна операция ждет завершения.

Синхронизация потоков: Lock, Semaphore, Event

Поскольку потоки делят общую память, могут возникать конфликты, например если два потока одновременно изменяют одну переменную. Для их предотвращения используются инструменты синхронизации:

  • Lock блокирует доступ к ресурсу, разрешая только одному потоку работать с ним в данный момент. Это предотвращает «гонку данных», когда результат зависит от порядка выполнения потоков.

Пример: 

import threading
 import time
 
 shared_resource = 0
 lock = threading.Lock()
 
 def increment():
     global shared_resource
     thread_name = threading.current_thread().name
     print(f"Поток {thread_name} пытается увеличить значение")
     with lock:
         current_value = shared_resource
 # Имитация небольшой задержки
         time.sleep(0.1)
         shared_resource = current_value + 1
         print(f"Поток {thread_name} увеличил значение до {shared_resource}")
 
 threads = [threading.Thread(target=increment) for  in range(10)]
 for t in threads:
     t.start()
 for t in threads:
     t.join()
 print(f"Итоговое значение: {sharedresource}")

Здесь Lock гарантирует, что shared_resource увеличивается ровно 10 раз, а не теряет значения из-за одновременных изменений. 

Вывод покажет, как разные потоки по очереди получают доступ к ресурсу, ожидая, пока предыдущий освободит блокировку. Без Lock результат мог бы быть меньше 10 из-за наложения операций. 

  • Semaphore ограничивает количество потоков, которые могут одновременно обращаться к ресурсу. Это полезно, когда доступ нужно дать не одному, а нескольким потокам, но не всем сразу.

Пример: 

import threading
 import time
 
 # Ставим ограничение на максимум 2 одновременных потока
 semaphore = threading.Semaphore(2)
 
 def task(name):
     with semaphore:
         print(f"Поток {name} начал работу")
         time.sleep(1)
         print(f"Поток {name} завершил работу")
 
 threads = [threading.Thread(target=task, args=(i,)) for i in range(5)]
 for t in threads:
     t.start()
 for t in threads:
     t.join()

В этом случае только два потока работают одновременно, остальные ждут, пока освободится «место». Вывод покажет, что задачи выполняются партиями по две. 

  • Event позволяет одному потоку ждать сигнала от другого, чтобы продолжить работу. Это удобно для координации действий между потоками.

Пример: 

import threading
 import time
 
 event = threading.Event()
 
 def waiter():
     print("Ожидаю сигнала...")
     event.wait()
     print("Сигнал получен, продолжаю работу")
 
 def signaler():
     time.sleep(2)
     print("Отправляю сигнал")
     event.set()
 
 t1 = threading.Thread(target=waiter)
 t2 = threading.Thread(target=signaler)
 t1.start()
 t2.start()
 t1.join()
 t2.join()

Здесь поток waiter ждет, пока signaler не вызовет event.set(), чтобы продолжить выполнение.

Примеры многопоточного программирования

Модуль threading особенно полезен в ситуациях, где задачи тратят много времени на ожидание внешних ресурсов (задачи типа I/O-bound). То есть скорость выполнения задач ограничена не вычислениями процессора, а операциями ввода-вывода (input/output), которые часто связаны с длительными задержками. 

Потоки позволяют переключаться между ними: пока одна задача ждет завершения операции, другая может начать работу. 

Рассмотрим два примера: загрузку веб-страниц и фоновую обработку задач.

Конкурентная загрузка URL

Загрузка данных из интернета — классический пример задачи ввода-вывода, где последовательное выполнение может быть неэффективным.

import threading
 import urllib.request
 
 urls = ["https://python.org", "https://example.com"]
 def fetch_url(url):
     response = urllib.request.urlopen(url)
     print(f"Загружен {url}, длина: {len(response.read())}")
 
 threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
 for t in threads:
     t.start()
 for t in threads:
     t.join()

Этот код создает отдельный поток для каждого URL, запускает их с помощью start() и ждет завершения через join(). В результате страницы загружаются конкурентно, что быстрее, чем последовательное выполнение, особенно если сетевые задержки значительны.

Обработка задач в фоновом режиме 

Потоки можно использовать для выполнения задач, таких как логирование или мониторинг, пока основная программа продолжает работу. 

Пример с фоновым логированием

import threading
 import time
 
 def background_logger(logs):
     while True:
         if logs:
             print(f"Лог: {logs.pop(0)}")
         time.sleep(1)
 
 def main_task(logs):
     for i in range(5):
         logs.append(f"Событие {i} в основном потоке")
         print(f"Основной поток работает: шаг {i}")
         time.sleep(0.5)
 
 logs = []
 logger_thread = threading.Thread(target=background_logger, args=(logs,), daemon=True)
 logger_thread.start()
 main_task(logs)

В этом примере поток background_logger работает в фоновом режиме, периодически выводя сообщения из списка logs, пока основной поток выполняет свою задачу. Атрибут daemon=True означает, что фоновый поток завершится вместе с основной программой.

Асинхронное программирование с asyncio

Асинхронное программирование позволяет выполнять задачи без блокировки основного потока, что особенно полезно для I/O-bound-задач. В отличие от threading, где для конкурентности используются потоки, asyncio управляет задачами в одном потоке через цикл событий. 

Это снижает накладные расходы на создание потоков и подходит для приложений с большим количеством операций ввода-вывода, например веб-скрейперов или серверов.

В чем отличие синхронного кода от асинхронного?

Асинхронный код отличается от синхронного тем, что он не блокирует выполнение программы во время ожидания операций, а переключается между задачами в одном потоке через цикл событий.

  • Синхронный код: последовательное выполнение с блокировкой, сравнение с очередью. 

  • Асинхронный код: запуск задач без ожидания, переключение через цикл событий.

Основы asyncio: async и await

В основе библиотеки asyncio сопрограммы (корутины) с ключевыми словами async и await, введенные в Python 3.5. Они позволяют писать асинхронный код, похожий на синхронный по читаемости.

  • async def определяет корутину, которая может приостанавливаться, не блокируя выполнение программы.

  • await указывает, где корутина должна ждать завершения другой асинхронной операции, освобождая управление для других задач.

Пример:

import asyncio
 
 async def say_hello():
     print("Начинаем...")
     # Ждем 1 секунду, не блокируя
     await asyncio.sleep(1)
     print("Привет!")
 
 async def main():
     # Запускаем корутину
     await say_hello()
 
 # Запускаем программу
 asyncio.run(main())

Здесь asyncio.sleep(1) имитирует асинхронную операцию (например, запрос к серверу), а await позволяет программе продолжать работу, пока ожидание не завершится. Функция asyncio.run() — стандартный способ запускать асинхронный код.

Event Loop и управление задачами

Асинхронность в asyncio работает благодаря циклу событий (Event Loop) — механизму, который управляет выполнением корутин, переключаясь между ними, когда они ждут операций. 

Пока одна корутина ждет ответа от сети, другая может обработать свои данные.

Для планирования нескольких задач используется asyncio.create_task():

async def task(name):
     print(f"Задача {name} началась")
     await asyncio.sleep(1)
     print(f"Задача {name} завершилась")
 
 async def main():
     task1 = asyncio.create_task(task("A"))
     task2 = asyncio.create_task(task("B"))
     await task1  # Ждем первую задачу
     await task2  # Ждем вторую
 
 asyncio.run(main())

В этом примере две задачи запускаются конкурентно, и цикл событий переключается между ними. Метод create_task() превращает корутину в задачу, которую asyncio выполняет независимо.

Асинхронные HTTP-запросы 

Asyncio работает с другими библиотеками, например aiohttp для быстрой, конкурентной загрузки страниц. 

В отличие от синхронной загрузки страниц с помощью urllib.request, где каждая страница загружается последовательно, aiohttp отправляет запросы к нескольким URL одновременно, не блокируя основной поток.

Пример:

import asyncio
 import aiohttp
 
 async def fetch_url(url):
     async with aiohttp.ClientSession() as session:
         async with session.get(url) as response:
             content = await response.text()
             print(f"Загружен {url}, длина: {len(content)}")
 
 async def main():
     urls = ["https://python.org", "https://example.com"]
     tasks = [asyncio.create_task(fetch_url(url)) for url in urls]
     await asyncio.wait(tasks)
 
 asyncio.run(main())

В итоге если одна страница отвечает медленно, то другая ее не ждет. Это значительно сокращает общее время по сравнению с синхронным кодом, где задержки суммируются.

Работа с сокетами

Asyncio позволяет создавать асинхронные серверы и клиенты для обработки соединений. 

Рассмотрим пример, где сервер принимает сообщения от клиентов и отправляет их обратно, преобразовав текст в заглавные буквы: если клиент отправляет «привет», сервер отвечает «ПРИВЕТ».

import asyncio
 
 async def handle_client(reader, writer):
     data = await reader.read(100)
     writer.write(data.upper())
     await writer.drain()
     writer.close()
 
 async def main():
     server = await asyncio.start_server(handle_client, "127.0.0.1", 8888)
     async with server:
         await server.serve_forever()
 
 asyncio.run(main())

Этот сервер принимает сообщения и отвечает заглавными буквами, обрабатывая множество клиентов одновременно.

Многопроцессность и параллельные вычисления

Параллельные вычисления — это подход, при котором несколько вычислительных задач выполняются одновременно, распределяясь по доступным ресурсам, таким как ядра процессора. Этот метод позволяет ускорить выполнение программ. 

Одна из форм параллельных вычислений — многопроцессность, при которой задачи распределяются между несколькими независимыми процессами. В Python она реализована с помощью модуля multiprocessing.

Работа с задачами CPU-bound

Многопроцессность реализуется с собственным интерпретатором Python в обход GIL. Это позволяет параллельно задействовать все ядра процессора и обеспечивать настоящий параллелизм, а не только конкурентность, как в threading или asyncio. 

Поэтому модуль multiprocessing — лучший выбор для задач CPU-bound, в отличие от I/O-bound сценариев, где лучше работают потоки или асинхронность.

Как работает модуль multiprocessing

Модуль multiprocessing создает отдельные процессы, каждый из которых работает со своим собственным интерпретатором Python. 

Пример:

from multiprocessing import Process
 
 def square(number):
     print(f"Квадрат числа {number}: {number * number}")
 
 if name == "__main__":
     processes = []
     for i in range(5):
         p = Process(target=square, args=(i,))
         processes.append(p)
         p.start()
     for p in processes:
         p.join()

В этом примере создается пять процессов, каждый из которых вычисляет квадрат числа. Метод start() запускает процесс, а join() ожидает его завершения.

В multiprocessing есть инструменты:

  • Process класс для создания и управления отдельным процессом. 

Нужно указать целевую функцию (target) и аргументы (args), после чего процесс запускается с помощью метода start().

Пример:

from multiprocessing import Process
 
 def square(number):
     result = number ** 2
     print(f"Квадрат числа {number}: {result}")
 
 if name == "__main__":
     processes = []
     for i in range(5):
         p = Process(target=square, args=(i,))
         processes.append(p)
         p.start()
     for p in processes:
         p.join()

Здесь создаем пять процессов, каждый из которых вычисляет квадрат числа. Проверка if name == "__main__" обязательна, чтобы избежать рекурсивного импорта при запуске процессов. 

Это особенно критично на Windows, где процессы создаются через spawn и импортируют модуль заново, но также важно для переносимости кода на другие платформы, такие как Linux или macOS, где поведение зависит от метода запуска.

  • Pool класс для создания пула процессов, который автоматически распределяет задачи между несколькими процессами. Метод map() позволяет применить функцию к списку данных и разделяет работу между процессами в пуле.

Пример:

from multiprocessing import Pool
 
 def cube(number):
     return number ** 3
 
 if name == "__main__":
 # Определяем пул из 3 процессов
     with Pool(3) as pool:  
         results = pool.map(cube, [1, 2, 3, 4, 5])
     print(results)

В этом случае Pool распределяет вычисление кубов чисел между тремя процессами. Каждый процесс берет часть списка [1, 2, 3, 4, 5], выполняет задачу и возвращает результат, который собирается в итоговый список.

Сравнение подходов

Давайте разберем, чем отличаются друг от друга все три подхода: конкурентность, асинхронность и параллелизм.

multiprocessing

threading

asyncio

Использует процессы, обходит GIL, подходит для CPU-bound-задач (вычисления, обработка данных). 


Требует больше ресурсов, чем потоки, из-за изоляции процессов.

Работает с потоками в одном процессе, ограничен GIL, эффективен для I/O-bound-задач (сеть, файлы), где ожидание преобладает над вычислениями. 


Легче и быстрее в создании, чем процессы.

Асинхронность в одном потоке через цикл событий, идеален для I/O-bound-задач с большим количеством операций (например, веб-запросы). 


Не подходит для CPU-bound-задач, но минимизирует накладные расходы.

Оптимизация производительности кода

Чтобы оптимизировать код, нужно сперва найти узкие места производительности с помощью профилирования. 

Профилирование — это анализ кода для выявления узких мест, то есть участков, где программа тратит больше всего времени или ресурсов.

В Python есть несколько инструментов для этого:

  • cProfile встроенный модуль, который измеряет время выполнения каждой функции и количество их вызовов.

Пример:

import cProfile
 def slow_function():
     return sum(i * i for i in range(1000000))
 cProfile.run("slow_function()")

Вывод покажет таблицу с колонками: ncalls (число вызовов), tottime (общее время в функции), cumtime (время с учетом подфункций). Например, если цикл занимает 0,5 секунды, вы увидите, что именно он — узкое место.

  • time простой инструмент для замера общего времени выполнения, подойдет для сравнения подходов (например, последовательного и параллельного).

Пример: 

import time
 from multiprocessing import Pool
 
 def square(n):
     return n * n
 
 start = time.time()
 with Pool(4) as pool:
     pool.map(square, range(1000))
 print(f"Время: {time.time() - start:.2f} сек")

Это позволяет быстро проверить, ускоряет ли multiprocessing код (например, 0,1 сек вместо 0,4 сек в однопоточном варианте).

  • Другие инструменты . Модуль line_profiler показывает время выполнения каждой строки кода (требует установки через pip install line_profiler), а Py-Spy визуальный профайлер, который строит графики работы программы в реальном времени.

После профилирования используйте результаты для оптимизации:

  1. Определите тип задачи. Если узкое место — вычисления (CPU-bound), переходите на multiprocessing. Для ожидания ввода-вывода (I/O-bound) — asyncio или threading.

  2. Параллелизуйте тяжелые операции. Если cProfile показывает, что функция занимает много времени, разбейте данные и используйте Pool для распределения по ядрам.

  3. Минимизируйте накладные расходы. Не создавайте процессы или потоки для мелких задач — запуск Process дороже, чем выгода от параллелизма на коротких операциях.

  4. Тестируйте изменения. Замеряйте время с time до и после оптимизации, чтобы убедиться, что многопроцессность или асинхронность действительно ускоряют код.

Обучиться с нуля на программиста и растить навыки до техлида или архитектора ПО можно на магистратуре МИФИ и Skillfactory по направлению «Программная инженерия». Оттачивайте навыки на реальных задачах от бизнеса и набирайтесь опыта.

Теги:
Хабы:
Всего голосов 7: ↑6 и ↓1+8
Комментарии9

Публикации

Информация

Сайт
www.skillfactory.ru
Дата регистрации
Дата основания
Численность
501–1 000 человек
Местоположение
Россия
Представитель
Skillfactory School