Pull to refresh

Асинхронное программирование: концепция Deferred

Reading time7 min
Views54K
Асинхронная концепция программирования заключается в том, что результат выполнения функции доступен не сразу же, а через некоторое время в виде некоторого асинхронного (нарушающего обычный порядок выполнения) вызова. Зачем такое может быть полезно? Рассмотрим несколько примеров.

Первый пример — сетевой сервер, веб-приложение. Чаще всего как таковых вычислений на процессоре такие приложения не выполняют. Большая часть времени (реального, не процессорного) тратится на ввод-вывод: чтение запроса от клиента, обращение к диску за данными, сетевые обращение к другим подсистемам (БД, кэширующие сервера, RPC и т.п.), запись ответа клиенту. Во время этих операций ввода-вывода процессор простаивает, его можно загрузить обработкой запросов других клиентов. Возможны различные способы решить эту задачу: отдельный процесс на каждое соединение (Apache mpm_prefork, PostgreSQL, PHP FastCGI), отдельный поток (нить) на каждое соединение или комбинированный вариант процесс/нить (Apache mpm_worker, MySQL). Подход с использованием процессов или нитей перекладывает мультиплексирование процессора между обрабатываемыми соединениями на ОС, при этом расходуется относительно много ресурсов (память, переключения контекста и т.п.), такой вариант не подходит для обработки большого количества одновременных соединений, но идеален для ситуации, когда объем вычислений достаточно высок (например, в СУБД). К плюсам модели нитей и процессов можно добавить потенциальное использование всех доступных процессоров в многопроцессорной архитектуре.

Альтернативой является использование однопоточной модели с использованием примитивов асинхронного ввода-вывода, предоставляемых ОС (select, poll, и т.п.). При этом объем ресурсов на каждое новое обслуживаемое соединение не такой большой (новый сокет, какие-то структуры в памяти приложения). Однако программирование существенно усложняется, т.к. данные из сетевых сокетов поступают некоторыми “отрывками”, причем за один цикл обработки данные поступают от разных соединений, находящихся в разных состояниях, часть соединений могут быть входящими от клиентов, часть — исходящими к внешним ресурсам (БД, другой сервер и т.п.). Для упрощения разработки используются различные концепции: callback, конечные автоматы и другие. Примеры сетевых серверов, использующих асинхронный ввод-вывод: nginx, lighttpd, HAProxy, pgBouncer, и т.д. Именно при такой однопоточной модели возникает необходимость в асинхронном программировании. Например, мы хотим выполнить запрос в БД. С точки зрения программы выполнение запроса — это сетевой ввод-вывод: соединение с сервером, отправка запроса, ожидание ответа, чтение ответа сервера БД. Поэтому если мы вызываем функцию “выполнить запрос БД”, то она сразу вернуть результат не сможет (иначе она должна была бы заблокироваться), а вернет лишь нечто, что позволит впоследствие получить результат запроса или, возможно, ошибку (нет соединения с сервером, некорректный запрос и т.п.) Этим возвращаемым значением удобно сделать именно Deferred.

Второй пример связан с разработкой обычных десктопных приложений. Предположим, мы решили сделать аналог Miranda (QIP, MDC, …), то есть свой мессенджер. В интерфейсе программы есть контакт-лист, где можно удалить контакт. Когда пользователь выбирает это действие, он ожидает что контакт исчезнет на экране и что он действительно удалится из контакт-листа. На самом деле операция удаления из серверного контакт-листа опирается на сетевое взаимодействие с сервером, при этом пользовательский интерфейс не должен быть заблокирован на время выполнения этой операции, поэтому в любом случае после выполнения операции потребуется некоторое асинхронное взаимодействие с результатом операции. Можно использовать механизм сигналов-слотов, callback’ов или что-то еще, но лучше всего подойдет Deferred: операция удаления из контакт-листа возвращает Deferred, в котором обратно придет либо положительный результат (всё хорошо), либо исключение (точная ошибка, которую надо сообщить пользователю): в случае ошибки контакт надо восстановить контакт в контакт-листе.

Примеры можно приводить долго и много, теперь о том, что же такое Deferred. Deferred — это сердце framework’а асинхронного сетевого программирования Twisted в Python. Это простая и стройная концепция, которая позволяет перевести синхронное программирование в асинхронный код, не изобретая велосипед для каждой ситуации и обеспечивая высокое качества кода. Deferred — это просто возвращаемый результат функции, когда этот результат неизвестен (не был получен, будет получен в другой нити и т.п.) Что мы можем сделать с Deferred? Мы можем “подвеситься” в цепочку обработчиков, которые будут вызваны, когда результат будет получен. При этом Deferred может нести не только положительный результат выполнения, но и исключения, сгенерированные функцией или обработчиками, есть возможность исключения обработать, перевыкинуть и т.д. Фактически, для синхронного кода есть более-менее однозначная параллель в терминах Deferred. Для эффективной разработки с Deferred оказываются полезными такие возможности языка программирования, как замыкания, лямбда-функци.

Приведем пример синхронного кода и его альтернативу в терминах Deferred:

try:
    # Скачать по HTTP некоторую страницу
    page = downloadPage(url)
    # Распечатать содержимое
    print page
except HTTPError, e:
    # Произошла ошибка
    print "An error occured: %s", e

В асинхронном варианте с Deferred он был бы записан следующим образом:

def printContents(contents):
    """
    Callback, при успешном получении текста страницы,
    распечатываем её содержимое.
    """
    print contents

def handleError(failure):
    """
    Errback (обработчик ошибок), просто распечатываем текст ошибки.
    """

    # Мы готовы обработать только HTTPError, остальные исключения
    # "проваливаются" ниже.
    failure.trap(HTTPError)
    # Распечатываем само исключение
    print "An error occured: %s", failure

# Теперь функция выполняется асинхронно и вместо непосредственного
# результата мы получаем Deferred
deferred = downloadPage(url)
# Навешиваем на Deferred-объект обработчики успешных результатов
# и ошибок (callback, errback).
deferred.addCallback(printContents)
deferred.addErrback(handleError)

На практике обычно мы возвращаем Deferred из функций, которые получают Deferred в процессе своей работы, навешиваем большое количество обработчиков, обрабатываем исключения, некоторые исключения возвращаем через Deferred (выбрасываем наверх). В качестве более сложного примера приведем код в асинхронном варианте для примера атомарного счетчика из статьи про структуры данных в memcached, здесь мы предполагаем, что доступ к memcached как сетевому сервису идет через Deferred, т.е. методы класса Memcache возвращают Deferred (который вернет либо результат операции, либо ошибку):

class MCCounter(MemcacheObject):
    def __init__(self, mc, name):
        """
        Конструктор.

        @param name: имя счетчика
        @type name: C{str}
        """
        super(MCCounter, self).__init__(mc)
        self.key = 'counter' + name

    def increment(self, value=1):
        """
        Увеличить значение счетчика на указанное значение.

        @param value: инкремент
        @type value: C{int}
        @return: Deferred, результат операции
        """
        def tryAdd(failure):
            # Обрабатываем только KeyError, всё остальное "вывалится"
            # ниже
            failure.trap(KeyError)

            # Пытаемся создать ключ, если раз его еще нет
            d = self.mc.add(self.key, value, 0)
            # Если вдруг кто-то еще создаст ключ раньше нас,
            # мы это обработаем
            d.addErrback(tryIncr)
            # Возвращаем Deferred, он "вклеивается" в цепочку
            # Deferred, в контексте которого мы выполняемся
            return d

        def tryIncr(failure):
            # Всё аналогично функции tryAdd
            failure.trap(KeyError)

            d = self.mc.incr(self.key, value)
            d.addErrback(tryAdd)
            return d

        # Пытаемся выполнить инкремент, получаем Deferred
        d = self.mc.incr(self.key, value)
        # Обрабатываем ошибку
        d.addErrback(tryAdd)
        # Возвращаем Deferred вызывающему коду, он может тем самым:
        #  а) узнать, когда операция действительно завершится
        #  б) обработать необработанные нами ошибки (например, разрыв соединения)
        return d

    def value(self):
        """
        Получить значение счетчика.

        @return: текущее значение счетчика
        @rtype: C{int}
        @return: Deferred, значение счетчика
        """
        def handleKeyError(failure):
            # Обрабатываем только KeyError
            failure.trap(KeyError)

            # Ключа нет — возвращаем 0, он станет результатом
            # вышележащего Deferred
            return 0

        # Пытаемся получить значение ключа
        d = self.mc.get(self.key)
        # Будем обрабатывать ошибку отсутствия ключа
        d.addErrback(handleKeyError)
        # Возвращаем Deferred, наверх там можно будет повеситься
        # на его callback и получить искомое значение счетчика
        return d

Приведенный выше код можно написать “короче”, объединяя часто используемые операции, например:

return self.mc.get(self.key).addErrback(handleKeyError)

Практически для каждой конструкции синхронного кода можно найти аналог в асинхронной концепции с Deferred:
  • последовательности синхронных операторов соответствует цепочка callback с асинхронными вызовами;
  • вызову одной подпрграммы с вводом-выводом из другой соответствует возврат Deferred из Deferred (ветвление Deferred);
  • глубокой цепочки вложенности, распространению исключений по стеку соответствует цепочка функций, возвращающие друг другу Deferred;
  • блокам try..except соответствуют обработчики ошибок (errback), которые могут “пробрасывать” исключения дальше, любое исключение в callback переводит выполнение в errback;
  • для “параллельного” выполнения асинхронных операций есть DeferredList.

Нити часто применяются в асинхронных программах для осуществления вычислительных процедур, осуществления блокирующегося ввода-вывода (когда не существует асинхронного аналога). Всё это легко моделируется в простой модели ‘worker’, тогда нет необходимости при грамотной архитектуре в явной синхронизации, при этом всё элегантно включается в общий поток вычислений с помощью Deferred:

def doCalculation(a, b):
    """
    В этой функции осуществляются вычисления, синхронные операции ввода-вывода,
    не затрагивающие основной поток.
    """

    return a/b

def printResult(result):
    print result

def handleDivisionByZero(failure):
    failure.trap(ZeroDivisionError)

    print "Ooops! Division by zero!"

deferToThread(doCalculation, 3, 2).addCallback(printResult).addCallback(
    lambda _: deferToThread(doCalculation, 3, 0).addErrback(handleDivisionByZero))

В приведенном выше примере функция deferToThread переносит выполнение указанной функции в отдельную нить и возвращает Deferred, через который будет асинхронно получен результат выполнения функции или исключение, если они будет выброшено. Первое деление (3/2) выполняется в отдельной нити, затем распечатывается его результат на экран, а затем запускается еще одно вычисление (3/0), которое генерирует исключение, обрабатываемое функцией handleDivisionByZero.

В одной статье не описать и части того, что хотелось бы сказать о Deferred, мне удалось не написать ни слова о том, как же они работают. Если успел заинтересовать — читайте материалы ниже, а я обещаю написать еще.

Дополнительные материалы


Tags:
Hubs:
Total votes 54: ↑44 and ↓10+34
Comments24

Articles