Асинхронное программирование: концепция Deferred

    Асинхронная концепция программирования заключается в том, что результат выполнения функции доступен не сразу же, а через некоторое время в виде некоторого асинхронного (нарушающего обычный порядок выполнения) вызова. Зачем такое может быть полезно? Рассмотрим несколько примеров.

    Первый пример — сетевой сервер, веб-приложение. Чаще всего как таковых вычислений на процессоре такие приложения не выполняют. Большая часть времени (реального, не процессорного) тратится на ввод-вывод: чтение запроса от клиента, обращение к диску за данными, сетевые обращение к другим подсистемам (БД, кэширующие сервера, RPC и т.п.), запись ответа клиенту. Во время этих операций ввода-вывода процессор простаивает, его можно загрузить обработкой запросов других клиентов. Возможны различные способы решить эту задачу: отдельный процесс на каждое соединение (Apache mpm_prefork, PostgreSQL, PHP FastCGI), отдельный поток (нить) на каждое соединение или комбинированный вариант процесс/нить (Apache mpm_worker, MySQL). Подход с использованием процессов или нитей перекладывает мультиплексирование процессора между обрабатываемыми соединениями на ОС, при этом расходуется относительно много ресурсов (память, переключения контекста и т.п.), такой вариант не подходит для обработки большого количества одновременных соединений, но идеален для ситуации, когда объем вычислений достаточно высок (например, в СУБД). К плюсам модели нитей и процессов можно добавить потенциальное использование всех доступных процессоров в многопроцессорной архитектуре.

    Альтернативой является использование однопоточной модели с использованием примитивов асинхронного ввода-вывода, предоставляемых ОС (select, poll, и т.п.). При этом объем ресурсов на каждое новое обслуживаемое соединение не такой большой (новый сокет, какие-то структуры в памяти приложения). Однако программирование существенно усложняется, т.к. данные из сетевых сокетов поступают некоторыми “отрывками”, причем за один цикл обработки данные поступают от разных соединений, находящихся в разных состояниях, часть соединений могут быть входящими от клиентов, часть — исходящими к внешним ресурсам (БД, другой сервер и т.п.). Для упрощения разработки используются различные концепции: callback, конечные автоматы и другие. Примеры сетевых серверов, использующих асинхронный ввод-вывод: nginx, lighttpd, HAProxy, pgBouncer, и т.д. Именно при такой однопоточной модели возникает необходимость в асинхронном программировании. Например, мы хотим выполнить запрос в БД. С точки зрения программы выполнение запроса — это сетевой ввод-вывод: соединение с сервером, отправка запроса, ожидание ответа, чтение ответа сервера БД. Поэтому если мы вызываем функцию “выполнить запрос БД”, то она сразу вернуть результат не сможет (иначе она должна была бы заблокироваться), а вернет лишь нечто, что позволит впоследствие получить результат запроса или, возможно, ошибку (нет соединения с сервером, некорректный запрос и т.п.) Этим возвращаемым значением удобно сделать именно Deferred.

    Второй пример связан с разработкой обычных десктопных приложений. Предположим, мы решили сделать аналог Miranda (QIP, MDC, …), то есть свой мессенджер. В интерфейсе программы есть контакт-лист, где можно удалить контакт. Когда пользователь выбирает это действие, он ожидает что контакт исчезнет на экране и что он действительно удалится из контакт-листа. На самом деле операция удаления из серверного контакт-листа опирается на сетевое взаимодействие с сервером, при этом пользовательский интерфейс не должен быть заблокирован на время выполнения этой операции, поэтому в любом случае после выполнения операции потребуется некоторое асинхронное взаимодействие с результатом операции. Можно использовать механизм сигналов-слотов, callback’ов или что-то еще, но лучше всего подойдет Deferred: операция удаления из контакт-листа возвращает Deferred, в котором обратно придет либо положительный результат (всё хорошо), либо исключение (точная ошибка, которую надо сообщить пользователю): в случае ошибки контакт надо восстановить контакт в контакт-листе.

    Примеры можно приводить долго и много, теперь о том, что же такое Deferred. Deferred — это сердце framework’а асинхронного сетевого программирования Twisted в Python. Это простая и стройная концепция, которая позволяет перевести синхронное программирование в асинхронный код, не изобретая велосипед для каждой ситуации и обеспечивая высокое качества кода. Deferred — это просто возвращаемый результат функции, когда этот результат неизвестен (не был получен, будет получен в другой нити и т.п.) Что мы можем сделать с Deferred? Мы можем “подвеситься” в цепочку обработчиков, которые будут вызваны, когда результат будет получен. При этом Deferred может нести не только положительный результат выполнения, но и исключения, сгенерированные функцией или обработчиками, есть возможность исключения обработать, перевыкинуть и т.д. Фактически, для синхронного кода есть более-менее однозначная параллель в терминах Deferred. Для эффективной разработки с Deferred оказываются полезными такие возможности языка программирования, как замыкания, лямбда-функци.

    Приведем пример синхронного кода и его альтернативу в терминах Deferred:

    try:
        # Скачать по HTTP некоторую страницу
        page = downloadPage(url)
        # Распечатать содержимое
        print page
    except HTTPError, e:
        # Произошла ошибка
        print "An error occured: %s", e
    

    В асинхронном варианте с Deferred он был бы записан следующим образом:

    def printContents(contents):
        """
        Callback, при успешном получении текста страницы,
        распечатываем её содержимое.
        """
        print contents
    
    def handleError(failure):
        """
        Errback (обработчик ошибок), просто распечатываем текст ошибки.
        """
    
        # Мы готовы обработать только HTTPError, остальные исключения
        # "проваливаются" ниже.
        failure.trap(HTTPError)
        # Распечатываем само исключение
        print "An error occured: %s", failure
    
    # Теперь функция выполняется асинхронно и вместо непосредственного
    # результата мы получаем Deferred
    deferred = downloadPage(url)
    # Навешиваем на Deferred-объект обработчики успешных результатов
    # и ошибок (callback, errback).
    deferred.addCallback(printContents)
    deferred.addErrback(handleError)
    

    На практике обычно мы возвращаем Deferred из функций, которые получают Deferred в процессе своей работы, навешиваем большое количество обработчиков, обрабатываем исключения, некоторые исключения возвращаем через Deferred (выбрасываем наверх). В качестве более сложного примера приведем код в асинхронном варианте для примера атомарного счетчика из статьи про структуры данных в memcached, здесь мы предполагаем, что доступ к memcached как сетевому сервису идет через Deferred, т.е. методы класса Memcache возвращают Deferred (который вернет либо результат операции, либо ошибку):

    class MCCounter(MemcacheObject):
        def __init__(self, mc, name):
            """
            Конструктор.
    
            @param name: имя счетчика
            @type name: C{str}
            """
            super(MCCounter, self).__init__(mc)
            self.key = 'counter' + name
    
        def increment(self, value=1):
            """
            Увеличить значение счетчика на указанное значение.
    
            @param value: инкремент
            @type value: C{int}
            @return: Deferred, результат операции
            """
            def tryAdd(failure):
                # Обрабатываем только KeyError, всё остальное "вывалится"
                # ниже
                failure.trap(KeyError)
    
                # Пытаемся создать ключ, если раз его еще нет
                d = self.mc.add(self.key, value, 0)
                # Если вдруг кто-то еще создаст ключ раньше нас,
                # мы это обработаем
                d.addErrback(tryIncr)
                # Возвращаем Deferred, он "вклеивается" в цепочку
                # Deferred, в контексте которого мы выполняемся
                return d
    
            def tryIncr(failure):
                # Всё аналогично функции tryAdd
                failure.trap(KeyError)
    
                d = self.mc.incr(self.key, value)
                d.addErrback(tryAdd)
                return d
    
            # Пытаемся выполнить инкремент, получаем Deferred
            d = self.mc.incr(self.key, value)
            # Обрабатываем ошибку
            d.addErrback(tryAdd)
            # Возвращаем Deferred вызывающему коду, он может тем самым:
            #  а) узнать, когда операция действительно завершится
            #  б) обработать необработанные нами ошибки (например, разрыв соединения)
            return d
    
        def value(self):
            """
            Получить значение счетчика.
    
            @return: текущее значение счетчика
            @rtype: C{int}
            @return: Deferred, значение счетчика
            """
            def handleKeyError(failure):
                # Обрабатываем только KeyError
                failure.trap(KeyError)
    
                # Ключа нет — возвращаем 0, он станет результатом
                # вышележащего Deferred
                return 0
    
            # Пытаемся получить значение ключа
            d = self.mc.get(self.key)
            # Будем обрабатывать ошибку отсутствия ключа
            d.addErrback(handleKeyError)
            # Возвращаем Deferred, наверх там можно будет повеситься
            # на его callback и получить искомое значение счетчика
            return d
    

    Приведенный выше код можно написать “короче”, объединяя часто используемые операции, например:

    return self.mc.get(self.key).addErrback(handleKeyError)
    

    Практически для каждой конструкции синхронного кода можно найти аналог в асинхронной концепции с Deferred:
    • последовательности синхронных операторов соответствует цепочка callback с асинхронными вызовами;
    • вызову одной подпрграммы с вводом-выводом из другой соответствует возврат Deferred из Deferred (ветвление Deferred);
    • глубокой цепочки вложенности, распространению исключений по стеку соответствует цепочка функций, возвращающие друг другу Deferred;
    • блокам try..except соответствуют обработчики ошибок (errback), которые могут “пробрасывать” исключения дальше, любое исключение в callback переводит выполнение в errback;
    • для “параллельного” выполнения асинхронных операций есть DeferredList.

    Нити часто применяются в асинхронных программах для осуществления вычислительных процедур, осуществления блокирующегося ввода-вывода (когда не существует асинхронного аналога). Всё это легко моделируется в простой модели ‘worker’, тогда нет необходимости при грамотной архитектуре в явной синхронизации, при этом всё элегантно включается в общий поток вычислений с помощью Deferred:

    def doCalculation(a, b):
        """
        В этой функции осуществляются вычисления, синхронные операции ввода-вывода,
        не затрагивающие основной поток.
        """
    
        return a/b
    
    def printResult(result):
        print result
    
    def handleDivisionByZero(failure):
        failure.trap(ZeroDivisionError)
    
        print "Ooops! Division by zero!"
    
    deferToThread(doCalculation, 3, 2).addCallback(printResult).addCallback(
        lambda _: deferToThread(doCalculation, 3, 0).addErrback(handleDivisionByZero))
    

    В приведенном выше примере функция deferToThread переносит выполнение указанной функции в отдельную нить и возвращает Deferred, через который будет асинхронно получен результат выполнения функции или исключение, если они будет выброшено. Первое деление (3/2) выполняется в отдельной нити, затем распечатывается его результат на экран, а затем запускается еще одно вычисление (3/0), которое генерирует исключение, обрабатываемое функцией handleDivisionByZero.

    В одной статье не описать и части того, что хотелось бы сказать о Deferred, мне удалось не написать ни слова о том, как же они работают. Если успел заинтересовать — читайте материалы ниже, а я обещаю написать еще.

    Дополнительные материалы


    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 24

      +1
      Вспомнил, как столкнулся с deferred в twisted и довольно долго вникал, как это работает. Потом привык и оказалась очень удобная штука.
        +1
        Поднять бы хабракат ко второму абзацу ,.)
          +1
          Я долго думал, где его поставить, остановился на текущем варианте, вроде бы и много получилось, но с другой стороны до хабраката осталась вся вводная часть.
          –2
          Пример асинхронного программирования:
          Есть соцсеть типа хабра, где разделено все на 3 части — информеры слева, статьи и комменты. Каждая часть на своем сервере.
          Открытие идет на аяксе, подгружая все три части и формируя страницу. При этом пока идет загрузка камментов, на информере может пройти обновление и там выйдет запись — опаньки, не открывайте камменты для статьи такой-то, бо тормоза и трафик!
            +1
            черт возьми, красиво реализовано
            • НЛО прилетело и опубликовало эту надпись здесь
                0
                Честно — не до конца понял концепт, но само асинхронное программирование люблю и уважаю.

                В свое время активно использовали, только код перевели из «обычного» в вариант «конечные автоматы»+«слоты и сигналы»

                В финале ускорились почти в 200(!) раз что позволило перевести систему с пней4 на ARM\400Мгц и по индукции кластеризовать работу…
                потому как(если подумать) если вы что-то ждете асинхронно — по сути по барабану где это будет работать и как команда на исполнение блока до этого где добереться
                  +2
                  Асинхронное программирование — очень хорошая штука в своей области. А на тему «как» и «где» доберется — Deferred как раз позволяет это сделать гораздо прозрачнее, однотипнее и понятнее. Постараюсь в следующих постах раскрыть тему.
                    0
                    Честно скажу — вы мой самый любимый писатель
                    Что не пишете — пишите правильно и почему-то… именно что о чем полезно было бы почитать именно сейчас для текущих заданий

                    Но… питон не для нас :(
                  0
                  Без поллитры не разобратся (:
                  В закладки!
                    0
                    Интересно, насколько это эффективно. Я как плюсер, предпочитаю ивенты или другие механизмы, поллингу и подобным вещам, поскольку последние отжирают процессорное время (еще бы — крутится в цикле и выполнять poll()). С одной стороны программа не будет блокироватся, с другой — это тоже принесет дополнительную нагрузку, потому как ассинхронное IO необходится без дополнительных потоков (запускается поток, который занимается поллингом и потом вызывает коллбек), а в Питоне есть GIL…
                      0
                      Асинхронный ввод-вывод очень эффективен как подход, причем дополнительных потоков нет. Пример таких программ был в статье, приведу пару высокопроизводительных еще раз: nginx, HAProxy.

                      Что касается непосредственно Deferred, их реализация достаточно компактная, ничего сложного, больших накладных расходов они не приносят.

                      Как факт: наш сервер вещаний (RTMP) на одном ядре процессора обрабатывает 70-150 Мбит/с (порядка 1000 одновременных соединений), в пике до 250 Мбит/с. Он написан на Python/Twisted.
                        0
                        Я знаю, что такое ассинхронное IO и эффективность его применения. Штука в том, что при синхронном IO прием данных происходит в контексте вашего трида. При ассинхронном IO в очередь APC потока добавляется новый APC. Прием данных происходит в контексте _другого_ потока и IO реквест помечается как завершенный. Затем, когда трид входит в alertable state (ждет где-то), запускается ваш APC коллбек.

                        Хотя по сути вы правы, запуск APC происходит в вашем же триде, так что проблем быть не должно.
                        Касательно производительности… 250 Мбит — не так уж много, одно ядро Core 2 Duo по идее может отдавать 1 гигабит при загрузке в 80-85%. Хотя конечно, все зависит от того, что вы еще обрабатываете в данный момент :)

                        Вопрос не в тему. nginx однопоточный. Подобное приложение на twisted фактически тоже (не считая операция в сетевом стеке). На многоядерных машинах, как вы перераспределяете коннекшены, дабы равномерно загрузить все ядра?
                          0
                          Я не знаю, что такое 'APC' в данном контексте, но потоков на уровне приложения в Twisted для ввода-вывода нет.

                          На многих ядрах запускается много процессов, потому что всё равно одним сервером это не ограничивается (приложение масштабируется по мере роста нагрузки). 250 Мбит/с — это уже конечный RTMP трафик, не самый удобный для этого дела протокол на самом деле )
                      0
                      Внесите ясность пожалуйста:
                      Правильно я понимаю, что любой код в любом случае «завершается» `reactor.run`ом и что по сути реактор и есть сердце Twisted без которого `Deffered` не работают. Или я не прав?
                        0
                        Реактор — сердце событийной системы, он вызывает select/poll/epoll/kqueue, которые обеспечивают возможность сетевого ввода-вывода. Как таковые Deferred на него не завязаны, но без реальных асинхронных событий (ввод-вывод) они большого смысла не имеют, поэтому на практике во всех Twisted-приложениях реактор работает от начала и до конца. Реакторы умеют интегрироваться со всякими GUIшными event loop'ами, поэтому большой проблемы в этом нет.
                        0
                        Для Си++ есть OpenSource фреймворк ACE, в котором реализован паттерн Active Object, который, по моему (я из него только объект Future использовал), делает то же самое.
                          0
                          ИМХО, это что-то похожее, но… простите меня… Deferred — это красиво и элегантно, а там какой-то тихий ужас ;)
                            0
                            Тут описана релизация, которая действительно «тихий ужас», но, к счастью, она скрыта от пользователя фреймворка. А по использованию так же проста.
                              0
                              Почитайте исходники Deferred, там всего полторы страницы кода ;) Может, будет интересно. На C++ его реализация далеко не так тривиальна, но это специфика языка, а на Python/JavaScript это точно занимает полторы страницы. Как и интерфейс Deferred.
                          0
                          в общем-то однозначно скачать что block+threads медленее нельзя. Если например сделать 10 потоков в си, которые будут что-нибудь качать, то по скорости будет аналогично асинхронному варианту. А вот если нужно 10тыс потоков создавать каждую секунду — то да, здесь асинхронный вариант шустрее.

                          В асинхронном подходе намного проще ловить ошибки. Так, например в питоне нельзя насильно разорвать заблокированный сокет, пока в последнем не сработает таймаут. В случае асинхронного варианта — фактически достаточно убрать у него обработчик.
                            0
                            block+threads сложнее программировать в общем случае из-за необходимости явной синхронизации
                              0
                              Это не совсем так. В случае сверх-лёгких нитей а-ля в Limbo (OS Inferno) или Go — базирующихся на CSP, с IPC через каналы — код получается намного яснее и проще event/callback-ориентированного да и никаких проблем с созданием тысяч нитей каждую секунду тоже нет.
                                0
                                Мой комментарий относился ни к скорости создания нитей, ни к красоте кода.

                                В общем случае, чистый код в нитях (без другой модели программирования, например Actor в Erlang) крайне труден в поддержке/разработке из-за наличия тех или иных синхронизаций, число которых для получения приемлемой скорости растет.

                          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                          Самое читаемое