Примитивы синхронизации в Python Asyncio: Исчерпывающее руководство
При написании приложений с несколькими потоками или процессами нужно помнить о возможности состояния гонки при использовании неатомарных операций. Даже простая задача для увеличения целого числа на единицу в конкурентной программе может вызвать ошибки, с трудом поддающиеся воспроизведению. Но при использовании asyncio мы всегда работаем в одном потоке (если только явно не задействовали средства многопоточной или многопроцессной обработки), значит, можно не беспокоиться о гонках, правда? На самом деле все не так просто!
Некоторые ошибки, встречающиеся в многопоточных и многопроцессных приложениях, исключены в силу однопоточной природы asyncio, но это не совсем так. Вам редко потребуется прибегать к синхронизации при работе с asyncio, однако остаются ситуации, когда с этим нужно что‑то делать. Примитивы синхронизации asyncio могут помочь предотвратить ошибки, свойственные только модели однопоточной конкурентности.
В этой статье вы узнаете, зачем нужны примитивы синхронизации asyncio, а также о лучших практиках использования нескольких примитивов синхронизации. В конце статьи рассмотрим практический пример использования примитивов синхронизации в действии.
readme
Если у вас есть замечания или вы нашли ошибку, пожалуйста, используйте Ctrl+Enter и я исправлю. Спасибо!
Зачем нужны примитивы синхронизации в asyncio?
Любой, кто использовал многопоточность в Python, знает, что несколько потоков используют один и тот же блок памяти. Поэтому, когда несколько потоков одновременно выполняют неатомарные операции над одной и той же областью, возникает проблема потокобезопасности. Поскольку asyncio работает в один поток, не возникает ли у него аналогичных проблем с потокобезопасностью? Ответ: нет.
Параллельные задачи в asyncio выполняются асинхронно, то есть возможно попеременное выполнение нескольких задач во времени. Ошибка параллелизма возникает, когда одна задача обращается к определенной области памяти и ожидает возврата операции ввода-вывода, а другая задача одновременно обращается к этой памяти.
Чтобы избежать подобных ошибок, в Python asyncio введен примитив синхронизации, аналогичный многопоточности. Это asyncio.Lock, asyncio.Semaphore, asyncio.Event и asyncio.Condition. Кроме того, чтобы избежать одновременного обращения к ресурсу слишком большого числа задач, примитивы синхронизации asyncio позволяют защитить ресурс, ограничив число одновременно обращающихся к нему задач.
Примитивы синхронизации в asyncio
asyncio.Lock
Рассмотрим ситуацию. Предположим, у нас есть параллельная задача, которой нужна копия данных сайта. Сначала она проверит, есть ли она в кэше, если есть, то получит ее из кэша, а если нет, то прочитает ее с сайта. Поскольку чтение данных сайта занимает некоторое время для возврата и обновления кэша, при одновременном выполнении нескольких параллельных задач все они предполагают, что этих данных в кэше нет, и запускают удаленные запросы одновременно, как показано в следующем коде:
import asyncio
import aiohttp
cache = dict()
async def request_remote():
print("Will request the website to get status.")
async with aiohttp.ClientSession() as session:
response = await session.get("https://www.example.com")
return response.status
async def get_value(key: str):
if key not in cache:
print(f"The value of key {key} is not in cache.")
value = await request_remote()
cache[key] = value
else:
print(f"The value of key {key} is already in cache.")
value = cache[key]
print(f"The value of {key} is {value}")
return value
async def main():
task_one = asyncio.create_task(get_value("status"))
task_two = asyncio.create_task(get_value("status"))
await asyncio.gather(task_one, task_two)
if __name__ == "__main__":
asyncio.run(main())
Это не соответствует нашему первоначальному замыслу, поэтому на помощь приходит asyncio.Lock. Мы можем проверить наличие данных в кэше, когда параллельные задачи должны получить блокировку первыми, а другие задачи, не получившие блокировку, будут ждать.
Пока задача, получившая блокировку, не завершит обновление кэша и не снимет блокировку, остальные задачи могут продолжать выполнение.
Вся блок-схема показана ниже:
Давайте поправим наш код:
import asyncio
from asyncio import Lock
import aiohttp
cache = dict()
lock = Lock()
async def request_remote():
print("Will request the website to get status.")
async with aiohttp.ClientSession() as session:
response = await session.get("https://www.example.com")
return response.status
async def get_value(key: str):
async with lock:
if key not in cache:
print(f"The value of key {key} is not in cache.")
value = await request_remote()
cache[key] = value
else:
print(f"The value of key {key} is already in cache.")
value = cache[key]
print(f"The value of {key} is {value}")
return value
async def main():
task_one = asyncio.create_task(get_value("status"))
task_two = asyncio.create_task(get_value("status"))
await asyncio.gather(task_one, task_two)
if __name__ == "__main__":
asyncio.run(main())
asyncio.Semaphore
Иногда нам необходимо получить доступ к ресурсу с ограниченным количеством одновременных запросов. Например, конкретная база данных допускает одновременное открытие только пяти соединений. Или, в зависимости от типа подписки, веб-интерфейс поддерживает только определенное количество одновременных запросов. В этом случае необходимо использовать asyncio.Semaphore. asyncio.Semaphore использует внутренний счетчик, который уменьшается на единицу при каждом получении блокировки Semaphore, пока не достигнет нуля.
Когда счетчик asyncio.Semaphore равен нулю, другие задачи, которым нужна блокировка, будут ждать. При вызове метода release после выполнения других задач счетчик будет увеличен на единицу. Ожидающие задачи могут продолжить выполнение.
Пример кода выглядит следующим образом:
import asyncio
from asyncio import Semaphore
from aiohttp import ClientSession
async def get_url(url: str, session: ClientSession, semaphore: Semaphore):
print('Waiting to acquire semaphore...')
async with semaphore:
print('Semaphore acquired, requesting...')
response = await session.get(url)
print('Finishing requesting')
return response.status
async def main():
# Хотя мы запускаем 1000 задач, одновременно будут выполняться только 10 задач.
semaphore: Semaphore = Semaphore(10)
async with ClientSession() as session:
tasks = [asyncio.create_task(get_url("https://www.example.com", session, semaphore))
for _ in range(1000)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
Таким образом, мы можем ограничить количество соединений, к которым можно получить одновременный доступ.
asyncio.BoundedSemaphore
Одна из особенностей семафоров заключается в том, что число вызовов метода release может превышать число вызовов acquire. Если мы всегда используем семафоры в сочетании с блоком async with, то такое невозможно, потому что с каждым acquire автоматически
связывается release. Но если нам требуется более точный контроль над механизмом захвата и освобождения, то возможны проблемы.
Что произойдет, если мы случайно вызовем метод release()
несколько раз?
import asyncio
from asyncio import Semaphore
async def acquire(semaphore: Semaphore):
print("acquire: Ожидание возможности захвата")
async with semaphore:
print("acquire: Захвачен...")
await asyncio.sleep(5)
print("acquire: Освобождается...")
async def release(semaphore: Semaphore):
print("release: Одиночное освобождение...")
semaphore.release()
print("release: Одиночное освобождение - готово!")
async def main():
semaphore = Semaphore(2)
print("Два захвата, три освобождения...")
await asyncio.gather(asyncio.create_task(acquire(semaphore)),
asyncio.create_task(acquire(semaphore)),
asyncio.create_task(release(semaphore)))
print("Три захвата...")
await asyncio.gather(asyncio.create_task(acquire(semaphore)),
asyncio.create_task(acquire(semaphore)),
asyncio.create_task(acquire(semaphore)))
if __name__ == "__main__":
asyncio.run(main())
Здесь мы создаем семафор с пределом 2. Затем дважды вызываем сопрограмму acquire и один раз release, т.е. всего семафор будет освобождён трижды. Первое обращение к gather завершается, по видимости, нормально:
Однако при втором обращении, когда мы захватываем семафор три раза, возникают проблемы – все три захвата происходят сразу! Мы непреднамеренно превысили предел семафора:
Как видно из кода, мы ограничены одновременным выполнением двух задач, но поскольку мы вызвали release более одного раза, то в следующий раз мы сможем выполнить три задачи одновременно. Для решения этой проблемы мы можем использовать asyncio.BoundedSemaphore. Bounded Semaphore - это версия Semaphore, которая вызывает ошибку ValueError в функции release(), если внутренний счетчик увеличивается выше начального значения.
Как мы знаем из исходного кода, при вызове release выдается ошибка ValueError, если значение счетчика больше значения, установленного при инициализации:
import asyncio
from asyncio import BoundedSemaphore
async def main():
semaphore = BoundedSemaphore(2)
await semaphore.acquire()
semaphore.release()
semaphore.release()
if __name__ == "__main__":
asyncio.run(main())
Здесь второй вызов release возбудит исключение ValueError, означающее, что мы освободили семафор слишком много раз. Аналогичный результат будет иметь место, если в предыдущем примере использовать BoundedSemaphore вместо Semaphore. Если вы вызываете acquire и release вручную, так что возникает опасность динамически превысить
предел семафора, то лучше работать с BoundedSemaphore, потому что возникшее исключение предупредит об ошибке.
asyncio.Event
Event поддерживает внутреннюю булеву переменную в качестве флага. asyncio.Event имеет три общих метода: wait, set и clear.
Когда задача добегает до event.wait(), она находится в состоянии ожидания. В этот момент можно вызвать event.set(), чтобы установить внутренний маркер в True, и все ожидающие задачи могут продолжить выполнение.
Когда задача завершится, необходимо вызвать метод event.clear(), чтобы сбросить значение маркера в False, вернуть событие в исходное состояние, и можно продолжать использовать событие в следующий раз.
В конце статьи вместо примера кода я покажу, как использовать Event для реализации шины событий.
asyncio.Condition
asyncio.Event хороши, когда нужно просто уведомить о том, что произошло нечто, но бывают ситуации посложнее. Допустим, что по событию требуется получить доступ к разделяемому ресурсу, т.е. захватить блокировку. Или что перед продолжением работы нужно дождаться более сложного сочетания условий, чем простое событие. Или что нужно разбудить не все задачи, а только определенное число. Во всех этих случаях могут выручить asyncio.Condition.
asyncio.Condition похож на asyncio.Lock и asyncio.Event вместе взятые. Сначала мы используем async with
, чтобы обеспечить получение блокировки условия, а затем вызываем condition.wait()
, чтобы освободить блокировку условия и заставить задачу временно подождать.
Когда condition.wait() проходит, мы возвращаем блокировку условия, чтобы гарантировать одновременное выполнение только одной задачи. Пока задача временно освобождает блокировку и переходит в состояние ожидания по методу condition.wait(), другая задача может либо асинхронизироваться с блокировкой условия и уведомить все ожидающие задачи о необходимости продолжить выполнение по методу condition.notify_all().
Блок-схема показана ниже:
Мы можем продемонстрировать действие asyncio.Condition на примере кода:
import asyncio
from asyncio import Condition
async def do_work(condition: Condition):
print("do_work: захватываю блокировку условия...")
async with condition:
print("do_work: Блокировка захвачена, освобождаю и жду выполнения условия...")
await condition.wait()
print("do_work: Условие выполнено, вновь захватываю блокировку и начинаю работать...")
await asyncio.sleep(1)
print("do_work: Работа закончена, блокировка освобождена.")
async def fire_event(condition: Condition):
await asyncio.sleep(5)
print("fire_event: захватываю блокировку условия...")
async with condition:
print("fire_event: Блокировка захвачена, уведомляю всех исполнителей")
condition.notify_all()
print("fire_event: Исполнители уведомлены, освобождаю блокировку.")
async def main():
condition = Condition()
asyncio.create_task(fire_event(condition))
await asyncio.gather(do_work(condition), do_work(condition))
if __name__ == "__main__":
asyncio.run(main())
Иногда нам требуется, чтобы asyncio.Condition ждал наступления определенного события, прежде чем перейти к следующему шагу. Мы можем вызвать метод condition.wait_for() и передать в качестве аргумента метод. При каждом вызове condition.notify_all метод condition.wait_for проверяет результат выполнения метода-параметра и завершает ожидание, если он равен True, или продолжает ожидание, если он равен False.
condition.wait_for мы можем продемонстрировать на примере. В следующем коде мы смоделируем подключение к базе данных. Перед выполнением SQL-оператора код проверит, инициализировано ли соединение с базой данных, и выполнит запрос, если инициализация соединения завершена, или подождет, пока соединение не завершит инициализацию:
import asyncio
from asyncio import Condition
from enum import Enum
class ConnectionState(Enum):
WAIT_INIT = 0
INITIALING = 1
INITIALIZED = 2
class Connection:
def __init__(self):
self._state = ConnectionState.WAIT_INIT
self._condition = Condition()
async def initialize(self):
print("initialize: Preparing initialize the connection.")
await self._change_state(ConnectionState.INITIALING)
await asyncio.sleep(5)
print("initialize: Connection initialized")
await self._change_state(ConnectionState.INITIALIZED)
async def execute(self, query: str):
async with self._condition:
print("execute: Waiting for connection initialized")
await self._condition.wait_for(self._is_initialized)
print(f"execute: Connection initialized, executing query: {query}")
await asyncio.sleep(5)
print("execute: Execute finished.")
async def _change_state(self, state: ConnectionState):
print(f"_change_state: Will change state from {self._state} to {state}")
self._state = state
print("_change_state: Change the state and notify all..")
async with self._condition:
self._condition.notify_all()
def _is_initialized(self):
if self._state is not ConnectionState.INITIALIZED:
print("_is_initialized: The connection is not initialized.")
return False
print("_is_initialized: The connection is ready.")
return True
async def main():
connection = Connection()
task_one = asyncio.create_task(connection.execute("SELECT * FROM table"))
task_two = asyncio.create_task(connection.execute("SELECT * FROM other_table"))
asyncio.create_task(connection.initialize())
await asyncio.gather(task_one, task_two)
if __name__ == "__main__":
asyncio.run(main())
asyncio.Barrier(добавлен в Python 3.11)
Объект Barrier - представляет собой объект. Непотокобезопасен. Это простой примитив синхронизации, позволяющий блокировать до тех пор, пока на нем не будет ожидать определенное количество задач. Задачи могут ожидать метод wait() и будут блокироваться до тех пор, пока заданное количество задач не закончит ожидание wait(). В этот момент все ожидающие задачи одновременно разблокируются.
Можно использовать async with как альтернативу ожиданию при вызове Barrier.wait(). Класс asyncio.Barrier можно использовать повторно любое количество раз.
async def example_barrier():
# barrier with 3 parties
b = asyncio.Barrier(3)
# create 2 new waiting tasks
asyncio.create_task(b.wait())
asyncio.create_task(b.wait())
await asyncio.sleep(0)
print(b)
# The third .wait() call passes the barrier
await b.wait()
print(b)
print("barrier passed")
await asyncio.sleep(0)
print(b)
asyncio.run(example_barrier())
Да, это пример из документации, в статье этого не было. По этому классу пока не нашёл более подробных примеров. Буду добавлять информацию по мере возможности.
Некоторые советы по использованию примитивов синхронизации asyncio
Не забывайте использовать таймаут или отмену при необходимости
При использовании примитивов синхронизации мы, как правило, ожидаем завершения определенной операции ввода-вывода. Однако из-за флуктуаций в сети или по другим неизвестным причинам операция ввода-вывода в задаче может выполняться дольше, чем в других.В этом случае необходимо установить таймаут для операции, чтобы при слишком большом времени выполнения освободить блокировку и дать возможность другим задачам выполнить ее вовремя.
В другом случае мы можем перебирать задачи в цикле. Это может заставить некоторые задачи ждать в фоновом режиме и помешать правильному завершению программы. В этом случае не забудьте воспользоваться командой cancel, чтобы прервать циклическое выполнение задачи.
Избегайте использования примитивов синхронизации или блокировки только наименьшего количества ресурсов
Все мы знаем, что преимущество asyncio заключается в том, что задача может переключаться на выполнение другой задачи в ожидании возврата IO. Но задача asyncio часто содержит как операции, связанные с IO, так и операции, связанные с процессором. Если заблокировать на задаче слишком много кода, то она не сможет вовремя переключиться на другую задачу, что скажется на производительности. Поэтому, если в этом нет необходимости, старайтесь не использовать примитивы синхронизации или блокировать только наименьшее количество ресурсов.
Чтобы избежать ситуаций с конкурентной блокировкой
В asyncio нет RLock, поэтому не используйте блокировки в рекурсивном коде. Как и в случае с многопоточностью, в asyncio также существует вероятность возникновения тупиковых ситуаций, поэтому старайтесь избегать одновременного использования нескольких блокировок.
Пример: Шина событий на базе Asyncio
Надеюсь к этому моменту вы смогли разобраться и понять как работают примитивы синхронизации в Asyncio. Давайте рассмотрим более практичный пример, рассмотрев реализацию шины событий. Как обычно, первым шагом является проектирование EventBus API.
import asyncio
from asyncio import Event
import inspect
from typing import Callable
class EventBus:
def __init__(self):
self._event_dict = dict()
async def on(self, event_name: str, fn: Callable):
PASS
def trigger(self, event_name: str, *args, **kwargs):
PASS
def _get_event(self, event_name: str):
PASS
Давайте реализуем эти методы. Поскольку EventBus взаимодействует с помощью строк, а внутри я намерен использовать asyncio.Event для реализации событий, соответствующих каждой строке, начнем с реализации метода _get_event:
def _get_event(self, event_name: str):
if event_name in self._event_dict:
print("event already inited...")
event = self._event_dict.get(event_name)
else:
print(f"need to init a new event for {event_name}")
event = Event()
self._event_dict[event_name] = event
return event
Метод on привязывает функцию обратного вызова к определенному событию:
async def on(self, event_name: str, fn: Callable):
event = self._get_event(event_name)
while True:
await event.wait()
print("event fired")
result = fn(*event.args, **event.kwargs)
if inspect.isawaitable(result):
await result
# Since the callback function is likely a synchronous method,
# we must perform an await here to allow other tasks to execute.
await asyncio.sleep(0.1)
event.clear()
Метод trigger может вручную вызвать событие и передать соответствующие данные:
def trigger(self, event_name: str, *args, **kwargs):
event = self._get_event(event_name)
event.args = args
event.kwargs = kwargs
event.set()
Наконец, напишем метод main, чтобы проверить действие EventBus:
def a_sync_callback(data):
print(f"A sync callback with data {data} is triggered")
async def a_async_callback(data):
await asyncio.sleep(1)
print(f"A async callback with data {data} is triggered")
async def main():
event_bus = EventBus()
task_one = asyncio.create_task(event_bus.on("some_event", a_async_callback))
task_two = asyncio.create_task(event_bus.on("some_event", a_sync_callback))
event_bus.trigger("some_event", {id: 1})
await asyncio.wait([task_one, task_two], timeout=20.0)
asyncio.run(main())
В конце метода main не забудьте использовать таймаут, чтобы программа не выполнялась постоянно, как я уже предупреждал.
Полный листинг кода программы
import asyncio
from asyncio import Event
import inspect
from typing import Callable
class EventBus:
def __init__(self):
self._event_dict = dict()
def _get_event(self, event_name: str):
if event_name in self._event_dict:
print("event already inited...")
event = self._event_dict.get(event_name)
else:
print(f"need to init a new event for {event_name}")
event = Event()
self._event_dict[event_name] = event
return event
async def on(self, event_name: str, fn: Callable):
event = self._get_event(event_name)
while True:
await event.wait()
print("event fired")
result = fn(*event.args, **event.kwargs)
if inspect.isawaitable(result):
await result
# Поскольку функция обратного вызова, скорее всего, является синхронным методом,
# мы должны выполнить здесь await, чтобы дать возможность другим задачам выполниться.
await asyncio.sleep(0.1)
event.clear()
def trigger(self, event_name: str, *args, **kwargs):
event = self._get_event(event_name)
event.args = args
event.kwargs = kwargs
event.set()
def a_sync_callback(data):
print(f"A sync callback with data {data} is triggered")
async def a_async_callback(data):
await asyncio.sleep(1)
print(f"A async callback with data {data} is triggered")
async def main():
event_bus = EventBus()
task_one = asyncio.create_task(event_bus.on("some_event", a_async_callback))
task_two = asyncio.create_task(event_bus.on("some_event", a_sync_callback))
event_bus.trigger("some_event", {id: 1})
await asyncio.wait([task_one, task_two], timeout=20)
asyncio.run(main())
Выводы
В этой статье сначала было рассказано о том, зачем Python asyncio нужны примитивы синхронизации. Затем я представил лучшие практики для Lock, Semaphore, Event и Condition и дал несколько советов по их правильному использованию.
Наконец, я привёл небольшой пример с практическим применением примитивов синхронизации asyncio, который, я надеюсь, поможет вам лучше использовать примитивы синхронизации в реальных проектах.
Не стесняйтесь комментировать, делиться опытом или обсуждать со мной темы, связанные с asyncio.
Контакты автора статьи
Пожалуйста, подпишитесь, если вы нашли статьи полезными, и получайте новые истории на свой почтовый ящик. Если у вас есть вопросы, вы можете найти меня на LinkedIn или в Twitter(X).
Помогайте другим там, где вы это можете делать. (с) Хабраэтикет