Как стать автором
Обновить

Конкурентность в асинхронном приложении на примере twisted

Python
Из песочницы
Теоретически, проблема конкурентного доступа не характерна для асинхронных приложений. В отличие от приложений с параллельной архитектурой, в которых в каждый момент времени может выполняться несколько задач претендующих на какой то общий ресурс — в асинхронном приложении в один момент времени выполняется только одна активность.

Но на практике все выглядит немного по иному:

Предстваим следующую ситуацияю. Наш сервис работает с удаленным сервисом (S), который обрабатывает запросы параллельно. У нас есть ресурс A, который зависит от данных на S. Запрос R1 обновляет A, для этого он запрашивает данные с S. В это время приходит R2, он модифицирует соответствующие данные на S. Запрос R2 пришел позже, но изза параллельной архитектуры S — обработался раньше (см. Рис.1).

image
Рис.1. Порядок выполнения запросов.

Теперь R1 получает данные уже с учетом модификаций R2, в то время как ресурс A еще не знает, что это результат операций R2. Вот вам и race condition.

Пример:

У Коли есть жена и счет в банке. Коля с женой решили, что у них в семье равноправие и общий бюджет, а потому у каждого из них есть по карте, привязанной к этому счету. Коля работает по фрилансу и приходит время, когда ему должны заплатить за последний проект.
Жена решает порадовать Колю, и подарить ему на ДР новый iPad2. Встав с утра по раньше, она находит на счету N$! Радостная она заказывает iPad2 мужу и новые часики себе. Когда Коля проверяет счет в банке, там «по прежнему» ноль. Он не догадывается проверить последние операции и долго долго ругается с заказчиком.

Коля — запрос 1, его жена — запрос 2, счет в банке — ресурс на удаленном сервисе, банковские карты — прокси для удаленного ресурса.

На месте Коли, жены и счета в банке могут быть: финансовый сервис, АСУТП и многое другое. Суть конкретно моей проблемы разгласить не могу, так как NDA.

Как бороться.

Бороться будем с применением twisted. Хотя общая концепция верна для любого асинхронного фреймворка.

Очевидно, что решением является блокировка ресурса A. Теоретически — для блокировки подойдут те же конструкции, что и для параллельной архитектуры: semaphor, mutex, conditional variable. Вопрос в реализации. Итак, рассмотрим mutex (почему mutex? потому что conditional variable тривиальна, а для semaphor мне пока не удалось найти применения).

Как сделать mutex?

Вариант нерабочий. В параллельной архитектуре — поток может просто спать, и нам ничего за это не будет. В асинхронной архитектуре — если мы просто уснем (например time.sleep(100) ) — то все встанет колом, и мы никогда ничего не дождемся, так как нам необходимо чтобы eventloop переключился на обработку запроса, заблокировавшего ресурс, а пока мы спим — этого не произойдет.

Вариант неправильный. Можно реализовать через reactor.callLater(1, self.some_method, *args), где self.some_method и есть наш метод, который ожидает окончания блокировки.

Недостатки следующие:
  • Довольно страшный код.
  • Ждем мы целую секунду, а за нее может многое случиться, например некий запрос Rn снова заблокирует A.


Вариант. И, наконец, верный вариант. У нас асинхронное приложение, построенное на Deferred'ах. Для того, чтобы что то в нем произошло — нужно чтобы сработал Deferred. Вывод — блокировку необходимо делать на Deferred'ах.

class Mutex(object):
  
    def __init__(self):
        self.locked = False
        self.waiters = list()

    def acquire(self):
        d = Deferred()
        if self.locked:
            self.waiters.append(d)
        else:
            self.locked = True
            d.callback(True)
        return d

    def release(self):
        self.locked = False
        if self.waiters:
            self.locked = True
            d = self.waiters.pop()
            d.callback(True)

Конструкция проста: если мы хотим получить доступ к ресурсу — мы получаем Deferred. Работать с ресурсом мы начинаем только после того, как Deferred срабатывает. Сам класс отслеживает отработку Deferred'ов, и, как только один из них отрабатывает, он достает из очереди следующий и запускает его.

Приведенный пример реализации не полон, реализацию мьютекса в twisted (DeferredLock) можно посмотреть здесь: http://twistedmatrix.com/trac/browser/tags/releases/twisted-11.0.0/twisted/internet/defer.py. Там же есть и DeferredSemaphore.

Пример использования.

Использовать можно по разному: можно сделать так, что объект сам следит за доступом к нему:

class ImportantObject(object):

    def __init__(self):
         self.lock = defer.DeferredLock()

    def get_lock(self):
         return self.lock.acquire()

    def release_lock(self):
         return self.lock.release()


Или, если объекты выбираются из БД и храняться в сессиях (то есть для каждого запроса — объект ImportantObject с id 1 будет разным), можно сделать пул блокировок:

class Pool(object):
    __metaclass__ = Singleton

    def __init__(self, objects_list):
        self.__objects = dict()
        for o in objects_list:
            self.__objects[o.id] = defer.DeferredLock()

    def acquire(self, o):
        if o.id not in self.__objects:
            self.__objects[o.id] = defer.DeferredLock()
        return self.__objects[o.id].acquire()

    def release(self, o):
        self.__objects[o.id].release()


Pool здесь слегка упрощен, мы «забыли» методы для обновления списка отслеживаемых объектов, чтобы не загромождать статью. Так же мы упустили реализацию Singleton, но найти /сделать ее — не составит труда даже начинающему питонисту.

Ну и наконец:

def multiplex(self, a):

    def get_value_from_remote_service(skipped, a):
        d = some_service.do_long_boring_call(a)
        return d

    def multiply(result, a):
        return result*a

    d = Pool().acquire(a)
    d.addCallback(get_value_from_remote_service, a)
    d.addCallback(power,a)
    return d


П.С.: К сожалению, документация twisted далеко не исчерпывающая. Более того, покрывает она хорошо если процентов 30 этого фреймворка. Поэтому, когда мной решалась проблема конкурентности — я в течении 3-х дней изобретал различные велосипеды. Пока не догадался посмотреть исходники twisted. Так что общий совет — работая с twisted — больше читайте исходники, там спрятаны велосипеды на все случаи жизни.

Источники:
Официальная документация twisted.
Исходники twisted
Теги:pythonasyncconcurrencytwisted
Хабы: Python
Всего голосов 31: ↑27 и ↓4 +23
Просмотры3.7K

Похожие публикации

Разработчик Python
от 160 000 до 200 000 ₽C-Executives LLCМоскваМожно удаленно
Разработчик Python
от 80 000 до 150 000 ₽СтратоСфераЗеленоград
Программист Python
от 40 000 до 60 000 ₽ПК «Энергосбережение»Можно удаленно
Python Developer/ Разработчик Python
от 200 000 ₽ТрансКонтейнерМосква
Разработчик Python
от 150 000 ₽СИТИЛАБМожно удаленно

Лучшие публикации за сутки