Асинхронное программирование — цепочки вызовов

    Когда в коде фигурирует пара вызовов BeginXxx()/EndXxx(), это приемлимо. Но что если алгоритм требует несколько таких вызовов подряд, то количество методов (или анонимных делегатов) преумножится и код станет менее читабельным. К счастью, эта проблема решена как в F# так и в C#.




    Задача


    Итак, представьте что вы хотите в полностью асинхронном режиме скачать веб-страницу и сохранить ее у себя на жестком диске. Для этого нужно



    • Начать скачивать страничку сайта
    • Когда она скачается, начать запись файла на диск
    • Когда запись завершилась, закрыть файловый поток и уведомить пользователя

    Простое решение


    Наивное решение задачи выглядит примерно вот так:



    static void Main(string[] args)<br/>
    {<br/>
      Program p = new Program();<br/>
      // начинаем загрузку
      p.DownloadPage("http://habrahabr.ru");<br/>
      // ждем 10сек.
      p.waitHandle.WaitOne(10000);<br/>
    }<br/>
    <br/>
    // пришлось вывесить несколько переменных
    private WebRequest wr;<br/>
    private FileStream fs;<br/>
    private AutoResetEvent waitHandle = new AutoResetEvent(false);<br/>
    <br/>
    // тут мы начинаем скачивать страницу
    public void DownloadPage(string url)<br/>
    {<br/>
      wr = WebRequest.Create(url);<br/>
      wr.BeginGetResponse(AfterGotResponse, null);<br/>
    }<br/>
    <br/>
    // тут мы получаем текст со страницы
    private void AfterGotResponse(IAsyncResult ar)<br/>
    {<br/>
      var resp = wr.EndGetResponse(ar);<br/>
      var stream = resp.GetResponseStream();<br/>
      var reader = new StreamReader(stream);<br/>
      string html = reader.ReadToEnd();<br/>
      // последний параметр true позволяет писать файлы асинхронно
      fs = new FileStream(@"c:\temp\file.htm", FileMode.CreateNew,<br/>
                          FileAccess.Write, FileShare.None, 1024, true);<br/>
      var bytes = Encoding.UTF8.GetBytes(html);<br/>
      // начинаем запись файла
      fs.BeginWrite(bytes, 0, bytes.Length, AfterDoneWriting, null);<br/>
    }<br/>
    <br/>
    // когда файл записан, устанавливаем wait handle
    private void AfterDoneWriting(IAsyncResult ar)<br/>
    {<br/>
      fs.EndWrite(ar);<br/>
      fs.Flush();<br/>
      fs.Close();<br/>
      waitHandle.Set();<br/>
    }<br/>

    Это решение еще цветочки. Представьте, например, что вам дополнительно нужно из файла асинхронно скачать и сохранить все картинки, и только когда они все сохранены открыть папку в которую они были записаны. Используя парадигму выше, это настоящий кошмар.



    Решение через анонимные делегаты


    Первое, что можно сделать – это сгруппировать кусочки функционала в анонимные делегаты[1]. Тогда получится примерно следующее:



    private AutoResetEvent waitHandle = new AutoResetEvent(false);<br/>
    public void DownloadPage(string url)<br/>
    {<br/>
      var wr = WebRequest.Create(url);<br/>
      wr.BeginGetResponse(ar =><br/>
      {<br/>
        var resp = wr.EndGetResponse(ar);<br/>
        var stream = resp.GetResponseStream();<br/>
        var reader = new StreamReader(stream);<br/>
        string html = reader.ReadToEnd();<br/>
        var fs = new FileStream(@"c:\temp\file.htm", FileMode.CreateNew,<br/>
                            FileAccess.Write, FileShare.None, 1024, true);<br/>
        var bytes = Encoding.UTF8.GetBytes(html);<br/>
        fs.BeginWrite(bytes, 0, bytes.Length, ar1 =><br/>
          {<br/>
            fs.EndWrite(ar1);<br/>
            fs.Flush();<br/>
            fs.Close();<br/>
            waitHandle.Set();<br/>
          }, null);<br/>
      }, null);<br/>
    }<br/>

    Это решение выглядит хорошо, но если цепочка вызовов действительно большая, то код не будет читабельным, и им будет сложно управлять. Это особенно касается ситуаций когда, например, выполнение цепочки нужно приостановить и отменить.



    Решение с использованием asynchronous workflows


    Workflow – это конструкт F#. Идея примерно такая – вы определяете некий блок, в котором некоторые операторы (такие как let, например) переопределены. Asynchronous workflow – это такой workflow, внутри которого переопределены операторы (let!, do! и другие) так, что эти операторы позволяют «дождаться» завершения операции. То есть, когда мы пишем



    async {<br/>
      ⋮<br/>
      let! x = Y()<br/>
      ⋮<br/>
    }<br/>

    это значит что мы делаем вызов BeginYyy() для Y, а когда результат доступен, записываем результат в x.



    Тем самым, аналог скачивания и записи файла на F# будет выглядеть примерно вот так:



    // вот как надо строить пару начало/конец для асинхронных вызовов
    // этот метод был по непонятным причинам убран из последних сборок F#
    type WebRequest with<br/>
      member x.GetResponseAsync() =<br/>
        Async.BuildPrimitive(x.BeginGetResponse, x.EndGetResponse)<br/>
    let private DownloadPage(url:string) =<br/>
      async {<br/>
        try<br/>
          let r = WebRequest.Create(url)<br/>
          let! resp = r.GetResponseAsync() // let! позволяет дождаться результата
          use stream = resp.GetResponseStream()<br/>
          use reader = new StreamReader(stream)<br/>
          let html = reader.ReadToEnd()<br/>
          use fs = new FileStream(@"c:\temp\file.htm", FileMode.Create,<br/>
                                  FileAccess.Write, FileShare.None, 1024, true);<br/>
          let bytes = Encoding.UTF8.GetBytes(html);<br/>
          do! fs.AsyncWrite(bytes, 0, bytes.Length) // ждем пока все запишется
        with<br/>
          | :? WebException -> ()<br/>
      }<br/>
    // вызов ниже делает синхронный вызов метода, но поведение внутри метода - асинхронное
    Async.RunSynchronously(DownloadPage("http://habrahabr.ru"))<br/>

    Используя хитрые синтактические конструкции, F# позволяет нам с помощью специально созданных «примитивов» (таких как GetResponseAsync() и AsyncWrite()) производить вызовы с Begin/End семантикой, но без разделения их на отдельные методы или делегаты. Как ни странно, примерно то же самое можно делать и в C#.



    Решение с использованием asynchronous enumerator


    Джефри Рихтер, всем известный автор книги CLR via C#, является также автором библиотеки PowerThreading. Эта библиотека[2] предоставляет ряд интересных фич, одна из которых – реализация аналога asynchronous workflow на C#.



    Делается это очень просто – у нас появляется некий «менеджер токенов» под названием AsyncEnumerator. Этот класс фактически позволяет прерывать исполнение метода и продолжать его снова. Как можно прервать исполнение метода? Это делается с помощью нехитрого выражения yield return.



    Использовать AsyncEnumerator просто. Берем и добавляем его как параметр в наш метод, а также меняем возвращаемое значение на IEnumerator<int>:



    public IEnumerator<int> DownloadPage(string url, AsyncEnumerator ae)<br/>
    {<br/>
      ⋮<br/>
    }<br/>

    Далее, пишем код с использованием BeginXxx()/EndXxx(), используя три простых правила:



    • Каждый BeginXxx() в качестве callback-параметра получает ae.End()
    • Каждый EndXxx() в качестве токена IAsyncResult получает ae.DequeueAsyncResult()
    • Каждый раз когда нужно чего-то ждать, мы делаем yield return X, где X – количество начатых операций

    Вот как выглядит наш метод скачивания при использовании AsyncEnumerator:



    public IEnumerator<int> DownloadPage(string url, AsyncEnumerator ae)<br/>
    {<br/>
      var wr = WebRequest.Create(url);<br/>
      wr.BeginGetResponse(ae.End(), null);<br/>
      yield return 1;<br/>
      var resp = wr.EndGetResponse(ae.DequeueAsyncResult());<br/>
      var stream = resp.GetResponseStream();<br/>
      var reader = new StreamReader(stream);<br/>
      string html = reader.ReadToEnd();<br/>
      using (var fs = new FileStream(@"c:\temp\file.htm", FileMode.Create,<br/>
                          FileAccess.Write, FileShare.None, 1024, true))<br/>
      {<br/>
        var bytes = Encoding.UTF8.GetBytes(html);<br/>
        fs.BeginWrite(bytes, 0, bytes.Length, ae.End(), null);<br/>
        yield return 1;<br/>
        fs.EndWrite(ae.DequeueAsyncResult());<br/>
      }<br/>
    }<br/>

    Как видите, асинхронный метод теперь записан в синхронном виде – мы даже умудрились использовать using для файлового потока. Код стал более читабелен, если конечно не считать дополнительные вызовы yield return.



    Теперь осталось только вызвать этот метод:



    static void Main()<br/>
    {<br/>
      Program p = new Program();<br/>
      var ae = new AsyncEnumerator();<br/>
      ae.Execute(p.DownloadPage("http://habrahabr.ru", ae));<br/>
    }   <br/>

    Заключение


    Проблема цепочек асинхронных вызовов оказалась не такой страшной. Для простых ситуаций подойдут анонимные делегаты; для сложных есть асинхронные воркфлоу и AsyncEnumerator, в зависимости от того, какой язык вам ближе.



    Цепочки – это просто, а что делать с целыми графами зависимостей? Об этом – в следующем посте. ■



    Заметки


    1. Примечательно то, что можно изначально сделать отдельные методы, а потом «заинлайнить» их с помощью ReSharper’а.
    2. Библиотека действительно интересная – советую открывать ее в Reflector’е, там много вкусного. Также, заметьте что лицензия библиотеки позволяет использовать ее только на Windows, что наверняка разозлит фанатов Mono

    Комментарии 15

      0
      Хм, вроде только была на главной, а теперь нету. Видимо не всем интересно :)
      Вообще очень неплохо, но особенно конечно доставляет удовлетворение своей очевидностью способ F#. Кстати сейчас готовлю один перевод про F#, там это упоминается как одно из достоинств. И, в принципе, резонно.
        0
        Пока еще на главной.
        –1
        Повторюсь: Я, конечно, зануда но…
        1. Нет такого понятия как анонимные делегаты, есть анонимные методы.
        2. Для асинхронной работы помимо библиотеки Рихтера есть еще и TPL, точнее будет в .Net 4. (С Рихтером, кстати, забавно на эту тему весной пообщались)
        3. Вообще же такие вещи, там где это поддерживается, делаются посредством continuation passing style (http://en.wikipedia.org/wiki/Continuation-passing_style)
        И небезизвестный Эрик Мейер сейчас пишет довольно забавный фреймворк на шарпе для работы в таком стиле через LINQ — вот это действительно круто :)
        Посмотреть об этом можно, например, здесь: channel9.msdn.com/shows/Going+Deep/Expert-to-Expert-Brian-Beckman-and-Erik-Meijer-Inside-the-NET-Reactive-Framework-Rx/
          –1
          Не пишите комментарии на зеркала блогов – никто не ответит :)

          1. Семантика. Можно погуглить ‘anonymous delegate’ и найти достаточно упоминаний.
          2. Никто не спорит что в task-level parallelizm в дотнете все в порядке. Но вот всякие Parallel.For оттуда же – не круто.
          3. Continuation passing style работает в простых ситуациях, когда задачу можно разбить на несколько Func<U,V,...>. AsyncEnumerator и F# позволяют формировать цепочки так же, как если бы вы писали синхронный код. То есть там например могут переплетаться изменения состояний, scope’ы, и так далее.
          Про Reactive Framework – тут уже на хабре было, в принципе bindable linq неплохо работает :)
            0
            Тогда закройте комментарии в зеркале.

            1. Например, погуглив по словам «папа римский педофил», тоже много чего найти можно, но это не значит, что так и есть на самом деле, иными словами это лишь доказывает, что ошиблись вы не один… Формально есть термин «анонимный метод», а термина «анонимный делегат» — нет. Хотя для блога простительно, как выразился один мой знакомый — это концерт в халате на лестничной клетке… )
            2. Parallel.For — как раз очень круто. Но речь не о нем, а о том, что там тоже есть поддержка континуейшенов в достаточно приемлемом виде.
            3. Continuation, как раз и были придуманы для того, чтобы писать асинхронный код, как синхронный, а AsyncEnumerator-ы уже попытка эмулировать это дело, но Reactive кажется удачнее.
            Вообще CPS был придуман функциональщиками, где каждая функция независима и общего состояния нет (что здорово) и ими же показана успешность применения такого подхода, отсюда утверждение, что могут переплетаться изменение состояний, и т. д. — звучит как антиреклама… ))
            И в принципе, я бы хотел посмотреть на задачу, которую невозможно разбить на несколько независимых функций с явной передачей состояния так, чтобы она не стала от этого понятнее и читаемее… :)
          –1
          Спасибо. Узнал, что в дотнете живётся не так плохо как я думал )
            +1
            У нас в дотнете все ажурно. Точнее Azure’но. :)
            0
            Интересно но не привычно (после С и ему подобных языков). Еще подробно расписано про данную возможность здесь:
            msdn.microsoft.com/ru-ru/magazine/cc967279.aspx
              0
              Попробовал прочитать, но, увы… на русском языке читать нереально. English version
            • НЛО прилетело и опубликовало эту надпись здесь
                +1
                Я об этом хотел через пост написать. В следующем про обычный граф, в потом — про conditional graph, что у нас и имеется.

                Задачка прикольная. И соглашусь — с такими вещами «голый» шарп справляется достаточно плохо.
                0
                На Java, например, каждая часть задачи заводилась бы в собственный java.lang.Thread (обеспечение инкапсуляции данных и кода), а те, в свою очередь, «цеплялись» друг к другу через вызов join() и, таким образом, обеспечивали последовательное взаимоувязанное выполнение этапов задачи.
                Из нитей, увязанных в граф зависимостей через join(), можно получать весьма нетривиальную функциональность, не теряя управляемости кодом.
                  0
                  А топологическая сортировка зависимостей сама собой произойдет?
                    0
                    В данном случае, что я привёл, топологическая сортировка зависимостей в самом графе — join() ждёт смерти объекта-нити, окончания которой ожидается в текущей нити, прежде чем запустить текущую нить.
                  0
                  Ничего против Вас не имею, но не слишком ли много var?

                  Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                  Самое читаемое