Теоретически, проблема конкурентного доступа не характерна для асинхронных приложений. В отличие от приложений с параллельной архитектурой, в которых в каждый момент времени может выполняться несколько задач претендующих на какой то общий ресурс — в асинхронном приложении в один момент времени выполняется только одна активность.
Но на практике все выглядит немного по иному:
Предстваим следующую ситуацияю. Наш сервис работает с удаленным сервисом (S), который обрабатывает запросы параллельно. У нас есть ресурс A, который зависит от данных на S. Запрос R1 обновляет A, для этого он запрашивает данные с S. В это время приходит R2, он модифицирует соответствующие данные на S. Запрос R2 пришел позже, но изза параллельной архитектуры S — обработался раньше (см. Рис.1).
Рис.1. Порядок выполнения запросов.
Теперь R1 получает данные уже с учетом модификаций R2, в то время как ресурс A еще не знает, что это результат операций R2. Вот вам и race condition.
У Коли есть жена и счет в банке. Коля с женой решили, что у них в семье равноправие и общий бюджет, а потому у каждого из них есть по карте, привязанной к этому счету. Коля работает по фрилансу и приходит время, когда ему должны заплатить за последний проект.
Жена решает порадовать Колю, и подарить ему на ДР новый iPad2. Встав с утра по раньше, она находит на счету N$! Радостная она заказывает iPad2 мужу и новые часики себе. Когда Коля проверяет счет в банке, там «по прежнему» ноль. Он не догадывается проверить последние операции и долго долго ругается с заказчиком.
Коля — запрос 1, его жена — запрос 2, счет в банке — ресурс на удаленном сервисе, банковские карты — прокси для удаленного ресурса.
На месте Коли, жены и счета в банке могут быть: финансовый сервис, АСУТП и многое другое. Суть конкретно моей проблемы разгласить не могу, так как NDA.
Бороться будем с применением twisted. Хотя общая концепция верна для любого асинхронного фреймворка.
Очевидно, что решением является блокировка ресурса A. Теоретически — для блокировки подойдут те же конструкции, что и для параллельной архитектуры: semaphor, mutex, conditional variable. Вопрос в реализации. Итак, рассмотрим mutex (почему mutex? потому что conditional variable тривиальна, а для semaphor мне пока не удалось найти применения).
Вариант нерабочий. В параллельной архитектуре — поток может просто спать, и нам ничего за это не будет. В асинхронной архитектуре — если мы просто уснем (например time.sleep(100) ) — то все встанет колом, и мы никогда ничего не дождемся, так как нам необходимо чтобы eventloop переключился на обработку запроса, заблокировавшего ресурс, а пока мы спим — этого не произойдет.
Вариант неправильный. Можно реализовать через reactor.callLater(1, self.some_method, *args), где self.some_method и есть наш метод, который ожидает окончания блокировки.
Недостатки следующие:
Вариант. И, наконец, верный вариант. У нас асинхронное приложение, построенное на Deferred'ах. Для того, чтобы что то в нем произошло — нужно чтобы сработал Deferred. Вывод — блокировку необходимо делать на Deferred'ах.
Конструкция проста: если мы хотим получить доступ к ресурсу — мы получаем Deferred. Работать с ресурсом мы начинаем только после того, как Deferred срабатывает. Сам класс отслеживает отработку Deferred'ов, и, как только один из них отрабатывает, он достает из очереди следующий и запускает его.
Приведенный пример реализации не полон, реализацию мьютекса в twisted (DeferredLock) можно посмотреть здесь: http://twistedmatrix.com/trac/browser/tags/releases/twisted-11.0.0/twisted/internet/defer.py. Там же есть и DeferredSemaphore.
Использовать можно по разному: можно сделать так, что объект сам следит за доступом к нему:
Или, если объекты выбираются из БД и храняться в сессиях (то есть для каждого запроса — объект ImportantObject с id 1 будет разным), можно сделать пул блокировок:
Pool здесь слегка упрощен, мы «забыли» методы для обновления списка отслеживаемых объектов, чтобы не загромождать статью. Так же мы упустили реализацию Singleton, но найти /сделать ее — не составит труда даже начинающему питонисту.
Ну и наконец:
П.С.: К сожалению, документация twisted далеко не исчерпывающая. Более того, покрывает она хорошо если процентов 30 этого фреймворка. Поэтому, когда мной решалась проблема конкурентности — я в течении 3-х дней изобретал различные велосипеды. Пока не догадался посмотреть исходники twisted. Так что общий совет — работая с twisted — больше читайте исходники, там спрятаны велосипеды на все случаи жизни.
Источники:
Официальная документация twisted.
Исходники twisted
Но на практике все выглядит немного по иному:
Предстваим следующую ситуацияю. Наш сервис работает с удаленным сервисом (S), который обрабатывает запросы параллельно. У нас есть ресурс A, который зависит от данных на S. Запрос R1 обновляет A, для этого он запрашивает данные с S. В это время приходит R2, он модифицирует соответствующие данные на S. Запрос R2 пришел позже, но изза параллельной архитектуры S — обработался раньше (см. Рис.1).
Рис.1. Порядок выполнения запросов.
Теперь R1 получает данные уже с учетом модификаций R2, в то время как ресурс A еще не знает, что это результат операций R2. Вот вам и race condition.
Пример:
У Коли есть жена и счет в банке. Коля с женой решили, что у них в семье равноправие и общий бюджет, а потому у каждого из них есть по карте, привязанной к этому счету. Коля работает по фрилансу и приходит время, когда ему должны заплатить за последний проект.
Жена решает порадовать Колю, и подарить ему на ДР новый iPad2. Встав с утра по раньше, она находит на счету N$! Радостная она заказывает iPad2 мужу и новые часики себе. Когда Коля проверяет счет в банке, там «по прежнему» ноль. Он не догадывается проверить последние операции и долго долго ругается с заказчиком.
Коля — запрос 1, его жена — запрос 2, счет в банке — ресурс на удаленном сервисе, банковские карты — прокси для удаленного ресурса.
На месте Коли, жены и счета в банке могут быть: финансовый сервис, АСУТП и многое другое. Суть конкретно моей проблемы разгласить не могу, так как NDA.
Как бороться.
Бороться будем с применением twisted. Хотя общая концепция верна для любого асинхронного фреймворка.
Очевидно, что решением является блокировка ресурса A. Теоретически — для блокировки подойдут те же конструкции, что и для параллельной архитектуры: semaphor, mutex, conditional variable. Вопрос в реализации. Итак, рассмотрим mutex (почему mutex? потому что conditional variable тривиальна, а для semaphor мне пока не удалось найти применения).
Как сделать mutex?
Вариант нерабочий. В параллельной архитектуре — поток может просто спать, и нам ничего за это не будет. В асинхронной архитектуре — если мы просто уснем (например time.sleep(100) ) — то все встанет колом, и мы никогда ничего не дождемся, так как нам необходимо чтобы eventloop переключился на обработку запроса, заблокировавшего ресурс, а пока мы спим — этого не произойдет.
Вариант неправильный. Можно реализовать через reactor.callLater(1, self.some_method, *args), где self.some_method и есть наш метод, который ожидает окончания блокировки.
Недостатки следующие:
- Довольно страшный код.
- Ждем мы целую секунду, а за нее может многое случиться, например некий запрос Rn снова заблокирует A.
Вариант. И, наконец, верный вариант. У нас асинхронное приложение, построенное на Deferred'ах. Для того, чтобы что то в нем произошло — нужно чтобы сработал Deferred. Вывод — блокировку необходимо делать на Deferred'ах.
class Mutex(object):
def __init__(self):
self.locked = False
self.waiters = list()
def acquire(self):
d = Deferred()
if self.locked:
self.waiters.append(d)
else:
self.locked = True
d.callback(True)
return d
def release(self):
self.locked = False
if self.waiters:
self.locked = True
d = self.waiters.pop()
d.callback(True)
Конструкция проста: если мы хотим получить доступ к ресурсу — мы получаем Deferred. Работать с ресурсом мы начинаем только после того, как Deferred срабатывает. Сам класс отслеживает отработку Deferred'ов, и, как только один из них отрабатывает, он достает из очереди следующий и запускает его.
Приведенный пример реализации не полон, реализацию мьютекса в twisted (DeferredLock) можно посмотреть здесь: http://twistedmatrix.com/trac/browser/tags/releases/twisted-11.0.0/twisted/internet/defer.py. Там же есть и DeferredSemaphore.
Пример использования.
Использовать можно по разному: можно сделать так, что объект сам следит за доступом к нему:
class ImportantObject(object):
def __init__(self):
self.lock = defer.DeferredLock()
def get_lock(self):
return self.lock.acquire()
def release_lock(self):
return self.lock.release()
Или, если объекты выбираются из БД и храняться в сессиях (то есть для каждого запроса — объект ImportantObject с id 1 будет разным), можно сделать пул блокировок:
class Pool(object):
__metaclass__ = Singleton
def __init__(self, objects_list):
self.__objects = dict()
for o in objects_list:
self.__objects[o.id] = defer.DeferredLock()
def acquire(self, o):
if o.id not in self.__objects:
self.__objects[o.id] = defer.DeferredLock()
return self.__objects[o.id].acquire()
def release(self, o):
self.__objects[o.id].release()
Pool здесь слегка упрощен, мы «забыли» методы для обновления списка отслеживаемых объектов, чтобы не загромождать статью. Так же мы упустили реализацию Singleton, но найти /сделать ее — не составит труда даже начинающему питонисту.
Ну и наконец:
def multiplex(self, a):
def get_value_from_remote_service(skipped, a):
d = some_service.do_long_boring_call(a)
return d
def multiply(result, a):
return result*a
d = Pool().acquire(a)
d.addCallback(get_value_from_remote_service, a)
d.addCallback(power,a)
return d
П.С.: К сожалению, документация twisted далеко не исчерпывающая. Более того, покрывает она хорошо если процентов 30 этого фреймворка. Поэтому, когда мной решалась проблема конкурентности — я в течении 3-х дней изобретал различные велосипеды. Пока не догадался посмотреть исходники twisted. Так что общий совет — работая с twisted — больше читайте исходники, там спрятаны велосипеды на все случаи жизни.
Источники:
Официальная документация twisted.
Исходники twisted