Пример асинхронной монады

    Предположим, две программы общаются друг с другом по сети, но не изволят дожидаться ответа, поэтому ответы приходят в произвольном порядке. Чтобы разобраться, что к чему, с сообщением посылается номер, а в ответе шлется номер исходного (на которое отвечаем) сообщения и номер ответа для последующей коммуникации.

    Нашей целью является описать последовательность приёма и отправок сообщений при общении с некоторым собеседником, а также иметь возможность использовать ввод/вывод (например, обращение к базе) между приёмами и отправками сообщений.

    Как бы в коде на вашем предпочитаемом языке выглядел, например, такой диалог, с учётом того, что в любой момент (между любыми из этих пунктов) могут прийти какие-то другие запросы, которые тоже надо обработать, но не впутать случайно в этот диалог:
    1. Посылаем число
    2. Приходит число в ответ
    3. Посылаем число из п.2 в квадрате
    4. В ответ опять число
    5. Выводим на консоль сумму чисел п.2 и п.4

    Вот как это будет выглядеть на Haskell (функция example, разумеется, неблокирующая):
    example :: Int -> AIO ()<br>
    example v = do<br>
        x <- request v<br>
        y <- request (* x)<br>
        io $ print (+ y)<br>

    Сравните это с блокирующей похожей функцией, которая, к примеру, запрашивает ответ у пользователя:
    example :: Int -> IO ()<br>
    example v = do<br>
        x <- request v<br>
        y <- request (* x)<br>
        print (+ y)<br>


    Дабы не отвлекаться на лишние детали и не иметь необходимости запускать несколько программ, я упрощу задачу для статьи. Читать-писать будем в канал (Chan a), а сообщение будет иметь тип (Int, String), т.е. номер сообщения и сериализованное значение.

    Подключим все необходимые модули:

    > module Test (<br>
    >     ) where<br>
    ><br>
    > import Control.Arrow<br>
    > import Control.Monad<br>
    > import Control.Concurrent.MVar<br>
    > import Control.Concurrent.Chan<br>
    > import Control.Concurrent<br>
    > import Data.List<br>
    > import Data.Maybe<br>

    Перед написанием монады, попробуем сначала сделать всё на callback'ах.
    При посылке сообщения, нам нужно сгенерировать какой-то номер, а так же добавить callback в список. Т.е. нам понадобятся изменяемые число и список пар число -> callback. Также нам нужны, собственно, сами каналы. В отличие от сокетов, их нужно два, так как в один мы будем писать, из другого читать. Всё это оформим в виде отдельного типа:

    > data AState = AState {<br>
    >     aCurrent :: MVar Int,<br>
    >     aWait :: MVar [(Int, String -> IO ())],<br>
    >     aChanOut :: Chan (Int, String),<br>
    >     aChanIn :: Chan (Int, String) }<br>
    ><br>
    > newA = liftM4 AState (newMVar 0) (newMVar []) newChan newChan<br>

    Обработчик сообщений с клиентской стороны должен читать из канала и вызывать callback'и в соответствии с номером сообщения.
    Будем читать из канала, затем искать подходящий callback (одновременно удаляя его из списка) и вызывать. Всё просто:

    > listener (AState _ w _ chIn) = forever $ do<br>
    >     (i, s) <- readChan chIn<br>
    >     -- modifyMVar принимает функцию a -> IO (a, b)<br>
    >     -- т.е. в нашу лямбду передаётся старое значение, мы же должны вернуть новое и общий результат вычисления.<br>
    >     -- в нашем случае мы вернем новый список и найденный callback.<br>
    >     callback <- modifyMVar w $ \callbacks -> do<br>
    >         -- Разделим callback'и в зависимости от их номера сообщения на неподходящие и подходящие.<br>
    >         let (past, ok) = partition ((/= i) . fst) callbacks<br>
    >         -- Подходящий должен быть один (или ноль).<br>
    >         case ok of<br>
    >             ((_, f):_) -> return (past, f) -- Вернем подходящий callback (и новый список).<br>
    >             _ -> return (past, \-> return ()) -- Вернем функцию, которая ничего не делает<br>
    >     callback s -- Вызываем callback.<br>

    Чтобы сообщения, пришедшие на «сервер» можно было наблюдать воочию, напишем обработчик, который будет выводить все приходящий сообщения нашему собеседнику (в канал aChanOut).
    Читаем из канала aChanOut и выводим на экран:

    > tracer (AState _ _ chOut _) = forever $ readChan chOut >>= print<br>

    Промонадический способ

    Для начала обойдёмся без монад. Попробуем просто написать функцию отправки сообщения.
    Она должна сгенерировать номер сообщения, сериализовать сообщение в строку и зарегистрировать callback.
    sendAndReceive1 :: AState -> String -> (String -> IO ()) -> IO ()<br>
    sendAndReceive1 (AState cur w chOut _) msg onMsg = do<br>
        i <- modifyMVar cur (return . (succ &&& id)) -- Увеличиваем на 1 и возвращаем старое значение.<br>
        modifyMVar_ w (return . ((i, onMsg):))       -- Регистрируем callback.<br>
        writeChan chOut (i, msg)                     -- Отправляем сообщение.<br>

    Использование в принципе приемлемо, но есть недочёты:
    sendAndReceive1 a (show 123) $ \ans -> do<br>
        let x = read ans            -- Десериализуем ответ.<br>
        print x<br>
        sendAndReceive1 a (show x) $ \ans2 -> do<br>
            -- ...<br>

    Во-первых, можно передавать функцию сериализации и десериализации, чтобы не писать их в callback'е и написать sendAndReceive2, использующие, к примеру, стандартные read и show по умолчанию.
    sendAndReceive1 :: AState -> a -> (-> String) -> (String -> b) -> (-> IO ()) -> IO ()<br>
    sendAndReceive1 (AState cur w chOut _) msg show_ read_ onMsg = do<br>
        i <- modifyMVar cur (return . (succ &&& id))<br>
        modifyMVar_ w (return . ((i, onMsg . read_):))<br>
        writeChan chOut (i, show_ msg)<br>
    <br>
    sendAndReceive2 :: (Show a, Read b) => AState -> a -> (-> IO ()) -> IO ()<br>
    sendAndReceive2 a msg onMsg = sendAndReceive1 a msg show read onMsg<br>
    <br>
    -- Так намного лучше.<br>
    sendAndReceive2 a 23 $ \-> do<br>
        print x<br>
        sendAndReceive2 a (+ 10) $ \-> ...<br>

    Можно было бы на этом остановиться, но тогда можно было вообще взять ассемблер, что уж там почему бы не воспользоваться более мощной абстракцией?

    Эумонадический способ

    Если вспомнить, что основная функция монады (не в ТК, а в Haskell) имеет тип m a -> (a -> m b) -> m b, то в качестве второго аргумента так и напрашивается наш callback. Но также туда надо иметь возможность передать и обычное вычисление типа print.
    Чтобы как-то их отличать, создадим новый тип с двумя вариантами:
    1. Сообщение + callback
    2. Чистое значение

    > data AS a = Send String (String -> AIO a) | Pure a<br>

    И обернём это в монаду IO.

    > data AIO a = AIO { aio :: IO (AS a) }<br>

    Таким образом вычисления у нас будут делиться на два лагеря: отправляющие сообщение и все остальные.

    Напишем функцию, которая «поднимает» обычное IO в нашу монаду. Она должна просто вернуть то же, что и IO, но обернув в конструктор Pure

    > io :: IO a -> AIO a<br>
    <br>
    io act = AIO $ do<br>
        v <- act<br>
        return (Pure v)<br>

    Или проще:

    > io = AIO . liftM Pure<br>

    Функция для отправки сообщения воспользуется вторым конструктором — Send, по сути просто упаковав аргументы в конструктор:

    > sendAndReceive :: a -> (-> String) -> (String -> b) -> AIO b<br>
    > sendAndReceive msg to from = AIO $ return $ Send (to msg) (return . from)<br>

    И аналогичная ей request, использующая для сериализации show, read:

    > request :: (Show a, Read b) => a -> AIO b<br>
    > request msg = sendAndReceive msg show read<br>

    Некоторая хитрость заключается в том, что мы ничего не вычисляем в нашей монаде, а лишь конструируем что-то вроде дерева вычислений. Сами по себе эти функции лишь создают тип AIO.
    Самое время приняться за функцию, которая всю эту помойку сможет вычислить. Т.е. привести в исполнение описанный нами диалог (например, example). Созданный диалог сводится к двум вариантам:
    1. Pure — нужно просто изъять значение и вернуть его.
    2. Send — здесь происходит основная работа — генерируем число, регистрируем callback и отправляем сообщение.

    > run :: AState -> AIO () -> IO ()<br>
    > run a@(AState cur w chOut chIn) act = run' act where<br>
    >     run' (AIO actIO) = do<br>
    >         as <- actIO<br>
    >         case as of<br>
    >             Pure value -> return value<br>
    >             Send msg onMsg -> do<br>
    >                 i <- modifyMVar cur (return . (succ &&& id))   -- Увеличим счетчик и вернем старое значение<br>
    >                 modifyMVar_ w (return . ((i, run' . onMsg):))  -- Добавим callback.<br>
    >                 writeChan chOut (i, msg)                       -- Отправим сообщение.<br>

    Теперь всё готово, чтобы написать instance монады:

    > instance Monad AIO where<br>
    >     return = AIO . return . Pure   -- Возвращаем чистое значение<br>
    >     AIO v >>= f = AIO $ do<br>
    >         x <- v                     -- Вычисляем AS, Send там или Pure?<br>
    >         case x of<br>
    >             -- Если Pure, то мы можем применить к нему callback сразу.<br>
    >             Pure value -> aio $ f value<br>
    >             -- Иначе мы "присоединяем" новый callback к старому.<br>
    >             Send msg onMsg -> return $ Send msg (\-> onMsg s >>= f)<br>

    Последнее, что нам понадобится, — это функция для проверки работоспособности, которая запустит потоки listener для обработки приходящих клиенту сообщений и tracer для вывода пришедших сообщений на сервер, и вернёт нам функцию для отправки сообщений с сервера обратно нашему клиенту. Т.е. в данном случае в качестве собеседника выступим мы сами, печатая то, что хотим отправить нашему клиенту:

    > start :: IO (AState, (Int, String) -> IO ())<br>
    > start = newA >>= forks where<br>
    >     forks a = mapM_ forkIO [listener a, tracer a] >> return (a, writeChan (aChanIn a))<br>

    Тыдыщь!

    Теперь можно проверить это в интерпретаторе на исходном примере.
    -- Запустим обработчики клиента и сервера, получим в кач-ве результата состояние клиента<br>
    -- и функцию, которая позволит нам выступить в роли сервера<br>
    ghci> (a, f) <- start<br>
    -- Запускаем первый диалог<br>
    ghci> run a (example 10)<br>
    -- От него сразу приходит сообщение<br>
    (0,"10")<br>
    -- Запускаем второй диалог<br>
    ghci> run a (example 20)<br>
    -- Он присылает такое же сообщение, но с другим номером<br>
    (1,"20")<br>
    -- Ответим первому "диалогу"<br>
    ghci> f (0, "11")<br>
    -- Он получает ответ и шлёт нам второй запрос<br>
    (2,"121")<br>
    -- Теперь ответим второму "диалогу"<br>
    ghci> f (1, "21")<br>
    -- Получаем новый запрос<br>
    (3,"441")<br>
    -- А теперь пошлем ответ опять второму "диалогу", и убедимся, что первый "диалог" на это не среагирует<br>
    ghci> f (3, "444")<br>
    -- Получаем должный вывод<br>
    465<br>
    -- То же с первым "диалогом"<br>
    ghci> f (2, "122")<br>
    133<br>
    ghci><br>

    Как можно заметить, несмотря на запуск двух example сразу, диалоги с ними не пересекаются.

    Кстати, всё это сообщение является программой на Literate Haskell, его можно скопировать в test.lhs и протестировать самому.

    P.S. Большое спасибо pechlambda за помощь в улучшении статьи.
    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 8

      +5
      Заранее прошу прощения за длинный комментарий.

      Признаться честно, мне было сложно проследить нить рассуждений автора. Сложно было отделить… скажем так API от тонкостей реализации в тексте. То есть сложно было вычленить ту часть кода, которая мне нужна, чтобы вот взять и начать использовать эту монаду на практике.

      Позволю себе несколько ремарок:
      1) Статус определённой вначале функции test (я бы назвал ее лучше example) для меня был темным до тех пор, пока я не увидел листинг диалога с интерпретатором в конце статьи. Итак:
      Функция тест является описанием протокола взаимодействия с неким собеседником. Замечу, что нам не интересно кто это будет и какого будет его состояние, мы просто описываем последовательность отправок и приёма сообщений (а также, возможно, некоторого промежуточного ввода\вывода, допустим обращения к базе, что реализуется через вызов функции io). Именно такое описание взаимодействий есть цель, к которой стремится автор в течение всей статьи.

      2) «Дабы не отвлекаться на лишние детали и не иметь необходимости запускать несколько программ, я упрощу задачу для статьи». На мой взгляд всё лишь усложнилось, потому что перестало быть видно «отдельных участников» обмена сообщениями. Лично для меня это было понять тяжелее всего. Но пока по отложим этот вопрос.

      3) Итак, функции send и response. Когда я впервые увидел эти функции при первом беглом чтении статьи, я решил, что это и есть ОНО — отделение посылки сообщения и ответа на него, ан нет. На самом деле функцию send стоило бы назвать sendAndRecieve, ибо она описывает не только ЧТО отправить, но и КАК обработать полученный в ответ результат. А функцию response (ответ) в свою очередь лично я бы назвал request (запрос), ибо это более точно отражает, на мой взгляд, суть процесса.

      4) Наконец, run — это инициация диалога act (да, того самого, который представлен, например, функцией test) с собеседником, заданным состоянием a.

      5) И последнее — в данном примере мы ведём диалог сами с собой (trace и listener помогают нам в этом)! Может быть это тривиальная мысль, но я никак не мог ее уловить, смотрим: сперва мы инициализируем два диалога с самим собой, причем наш ввод выступает как раз тем самым «собеседником», с которым мы общаемся посредством протокола test. В первый раз нам прислали 10, во второй — 20. По каким-то одним нам ведомым причинам в ответ мы отправляем, соответсвенно, 11 и 21 (это и будут первые слагаемые нашей результирующей суммы). Они возводятся в квадрат нашим протоколом и вновь обратно к нам. Теперь мы добавляем более безумные 444 и 122, в результате дающие нам суммы 465 и 133.

      Вывод: я заключил для себя следующее. Функции, описывающие протокол (типа test) взаимодействия, я бы назвал «серверной частью» (для простоты восприятия). Конкретный же «клиент» отправляет «запросы» к этому протоколу откуда-то извне (легко переписать это дело на сокеты) — в нашем случае это была строка интерпретатора и функция f (которую опять же следовало бы назвать как-то более осмысленно).
        +1
        Уф, прошу прощения еще раз, поправлю сам себя:

        4) Конечно же, а — это не состояние собеседника, а скорее состояние «сервера» перед диалогом по «протоколу» act.
          +1
          Спасибо большое за обширный ответ, попробую внести правки, чтобы всё это было проще понять. Без свежего взгляда сложно понять, что именно может вызвать трудности в понимании.
          +1
          на C# исходная задача реализуется примерно так:

          private static IObservable Test(int v)
          {
          var sum =
          from x in SendRequest(v)
          from y in SendRequest(x*x)
          select x + y;
          return sum.Select(x =>
          {
          Console.WriteLine(x);
          return new Unit(); // это нужно потому что void - это не тип.
          });
          }
            0
            Разве это готовый код, который можно скомпилировать и запустить, и в котором можно «пообщаться с собой»? Где реализация SendRequest? Что такое Unit? При этом Вы пишете в консоль прямо в коде, описывающем протокол взаимодействия.
            Увы, но на мой взгляд это демонстрация минусов императивных языков, Вы не моргнув и глазом намешали описание взаимодействия и тонкости конкретной реализации.
              0
              Прошу прощения, я был не прав, Вы привели верный аналог функции test от автора.
                0
                Всё потому, что LINQ — монада :)
                Кстати, если немного усложнить пример:
                test2 = do
                    x <- response "hello"
                    io $ putStrLn x
                    y <- response x
                    z <- response (++ show (:: Int))
                    io $ putStrLn $ "select from DB: " ++ show y ++ show (:: Int)
                    t <- io $ selectSmthFromDB y z
                    io $ writeFile "tmp" t
                  +2
                  примерно так же…
                  var test2 =
                                  from x in SendRequest("hello")
                                  let _ = WriteLine(x)
                                  from y in SendRequest(x)
                                  from z in SendRequest(x + y)
                                  let __ = WriteLine("select from DB: " + y + z)
                                  let t = selectSmthFromDB(y, z)
                                  select WriteFile("tmp", t);
                  

                  предполагается, что WriteLine/WriteFile возвращают какое нить значение, например Unit

              Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

              Самое читаемое