Как стать автором
Обновить

Пример асинхронной монады

Время на прочтение 12 мин
Количество просмотров 2.1K
Предположим, две программы общаются друг с другом по сети, но не изволят дожидаться ответа, поэтому ответы приходят в произвольном порядке. Чтобы разобраться, что к чему, с сообщением посылается номер, а в ответе шлется номер исходного (на которое отвечаем) сообщения и номер ответа для последующей коммуникации.

Нашей целью является описать последовательность приёма и отправок сообщений при общении с некоторым собеседником, а также иметь возможность использовать ввод/вывод (например, обращение к базе) между приёмами и отправками сообщений.

Как бы в коде на вашем предпочитаемом языке выглядел, например, такой диалог, с учётом того, что в любой момент (между любыми из этих пунктов) могут прийти какие-то другие запросы, которые тоже надо обработать, но не впутать случайно в этот диалог:
1. Посылаем число
2. Приходит число в ответ
3. Посылаем число из п.2 в квадрате
4. В ответ опять число
5. Выводим на консоль сумму чисел п.2 и п.4

Вот как это будет выглядеть на Haskell (функция example, разумеется, неблокирующая):
example :: Int -> AIO ()<br>
example v = do<br>
    x <- request v<br>
    y <- request (* x)<br>
    io $ print (+ y)<br>

Сравните это с блокирующей похожей функцией, которая, к примеру, запрашивает ответ у пользователя:
example :: Int -> IO ()<br>
example v = do<br>
    x <- request v<br>
    y <- request (* x)<br>
    print (+ y)<br>


Дабы не отвлекаться на лишние детали и не иметь необходимости запускать несколько программ, я упрощу задачу для статьи. Читать-писать будем в канал (Chan a), а сообщение будет иметь тип (Int, String), т.е. номер сообщения и сериализованное значение.

Подключим все необходимые модули:

> module Test (<br>
>     ) where<br>
><br>
> import Control.Arrow<br>
> import Control.Monad<br>
> import Control.Concurrent.MVar<br>
> import Control.Concurrent.Chan<br>
> import Control.Concurrent<br>
> import Data.List<br>
> import Data.Maybe<br>

Перед написанием монады, попробуем сначала сделать всё на callback'ах.
При посылке сообщения, нам нужно сгенерировать какой-то номер, а так же добавить callback в список. Т.е. нам понадобятся изменяемые число и список пар число -> callback. Также нам нужны, собственно, сами каналы. В отличие от сокетов, их нужно два, так как в один мы будем писать, из другого читать. Всё это оформим в виде отдельного типа:

> data AState = AState {<br>
>     aCurrent :: MVar Int,<br>
>     aWait :: MVar [(Int, String -> IO ())],<br>
>     aChanOut :: Chan (Int, String),<br>
>     aChanIn :: Chan (Int, String) }<br>
><br>
> newA = liftM4 AState (newMVar 0) (newMVar []) newChan newChan<br>

Обработчик сообщений с клиентской стороны должен читать из канала и вызывать callback'и в соответствии с номером сообщения.
Будем читать из канала, затем искать подходящий callback (одновременно удаляя его из списка) и вызывать. Всё просто:

> listener (AState _ w _ chIn) = forever $ do<br>
>     (i, s) <- readChan chIn<br>
>     -- modifyMVar принимает функцию a -> IO (a, b)<br>
>     -- т.е. в нашу лямбду передаётся старое значение, мы же должны вернуть новое и общий результат вычисления.<br>
>     -- в нашем случае мы вернем новый список и найденный callback.<br>
>     callback <- modifyMVar w $ \callbacks -> do<br>
>         -- Разделим callback'и в зависимости от их номера сообщения на неподходящие и подходящие.<br>
>         let (past, ok) = partition ((/= i) . fst) callbacks<br>
>         -- Подходящий должен быть один (или ноль).<br>
>         case ok of<br>
>             ((_, f):_) -> return (past, f) -- Вернем подходящий callback (и новый список).<br>
>             _ -> return (past, \-> return ()) -- Вернем функцию, которая ничего не делает<br>
>     callback s -- Вызываем callback.<br>

Чтобы сообщения, пришедшие на «сервер» можно было наблюдать воочию, напишем обработчик, который будет выводить все приходящий сообщения нашему собеседнику (в канал aChanOut).
Читаем из канала aChanOut и выводим на экран:

> tracer (AState _ _ chOut _) = forever $ readChan chOut >>= print<br>

Промонадический способ

Для начала обойдёмся без монад. Попробуем просто написать функцию отправки сообщения.
Она должна сгенерировать номер сообщения, сериализовать сообщение в строку и зарегистрировать callback.
sendAndReceive1 :: AState -> String -> (String -> IO ()) -> IO ()<br>
sendAndReceive1 (AState cur w chOut _) msg onMsg = do<br>
    i <- modifyMVar cur (return . (succ &&& id)) -- Увеличиваем на 1 и возвращаем старое значение.<br>
    modifyMVar_ w (return . ((i, onMsg):))       -- Регистрируем callback.<br>
    writeChan chOut (i, msg)                     -- Отправляем сообщение.<br>

Использование в принципе приемлемо, но есть недочёты:
sendAndReceive1 a (show 123) $ \ans -> do<br>
    let x = read ans            -- Десериализуем ответ.<br>
    print x<br>
    sendAndReceive1 a (show x) $ \ans2 -> do<br>
        -- ...<br>

Во-первых, можно передавать функцию сериализации и десериализации, чтобы не писать их в callback'е и написать sendAndReceive2, использующие, к примеру, стандартные read и show по умолчанию.
sendAndReceive1 :: AState -> a -> (-> String) -> (String -> b) -> (-> IO ()) -> IO ()<br>
sendAndReceive1 (AState cur w chOut _) msg show_ read_ onMsg = do<br>
    i <- modifyMVar cur (return . (succ &&& id))<br>
    modifyMVar_ w (return . ((i, onMsg . read_):))<br>
    writeChan chOut (i, show_ msg)<br>
<br>
sendAndReceive2 :: (Show a, Read b) => AState -> a -> (-> IO ()) -> IO ()<br>
sendAndReceive2 a msg onMsg = sendAndReceive1 a msg show read onMsg<br>
<br>
-- Так намного лучше.<br>
sendAndReceive2 a 23 $ \-> do<br>
    print x<br>
    sendAndReceive2 a (+ 10) $ \-> ...<br>

Можно было бы на этом остановиться, но тогда можно было вообще взять ассемблер, что уж там почему бы не воспользоваться более мощной абстракцией?

Эумонадический способ

Если вспомнить, что основная функция монады (не в ТК, а в Haskell) имеет тип m a -> (a -> m b) -> m b, то в качестве второго аргумента так и напрашивается наш callback. Но также туда надо иметь возможность передать и обычное вычисление типа print.
Чтобы как-то их отличать, создадим новый тип с двумя вариантами:
1. Сообщение + callback
2. Чистое значение

> data AS a = Send String (String -> AIO a) | Pure a<br>

И обернём это в монаду IO.

> data AIO a = AIO { aio :: IO (AS a) }<br>

Таким образом вычисления у нас будут делиться на два лагеря: отправляющие сообщение и все остальные.

Напишем функцию, которая «поднимает» обычное IO в нашу монаду. Она должна просто вернуть то же, что и IO, но обернув в конструктор Pure

> io :: IO a -> AIO a<br>
<br>
io act = AIO $ do<br>
    v <- act<br>
    return (Pure v)<br>

Или проще:

> io = AIO . liftM Pure<br>

Функция для отправки сообщения воспользуется вторым конструктором — Send, по сути просто упаковав аргументы в конструктор:

> sendAndReceive :: a -> (-> String) -> (String -> b) -> AIO b<br>
> sendAndReceive msg to from = AIO $ return $ Send (to msg) (return . from)<br>

И аналогичная ей request, использующая для сериализации show, read:

> request :: (Show a, Read b) => a -> AIO b<br>
> request msg = sendAndReceive msg show read<br>

Некоторая хитрость заключается в том, что мы ничего не вычисляем в нашей монаде, а лишь конструируем что-то вроде дерева вычислений. Сами по себе эти функции лишь создают тип AIO.
Самое время приняться за функцию, которая всю эту помойку сможет вычислить. Т.е. привести в исполнение описанный нами диалог (например, example). Созданный диалог сводится к двум вариантам:
1. Pure — нужно просто изъять значение и вернуть его.
2. Send — здесь происходит основная работа — генерируем число, регистрируем callback и отправляем сообщение.

> run :: AState -> AIO () -> IO ()<br>
> run a@(AState cur w chOut chIn) act = run' act where<br>
>     run' (AIO actIO) = do<br>
>         as <- actIO<br>
>         case as of<br>
>             Pure value -> return value<br>
>             Send msg onMsg -> do<br>
>                 i <- modifyMVar cur (return . (succ &&& id))   -- Увеличим счетчик и вернем старое значение<br>
>                 modifyMVar_ w (return . ((i, run' . onMsg):))  -- Добавим callback.<br>
>                 writeChan chOut (i, msg)                       -- Отправим сообщение.<br>

Теперь всё готово, чтобы написать instance монады:

> instance Monad AIO where<br>
>     return = AIO . return . Pure   -- Возвращаем чистое значение<br>
>     AIO v >>= f = AIO $ do<br>
>         x <- v                     -- Вычисляем AS, Send там или Pure?<br>
>         case x of<br>
>             -- Если Pure, то мы можем применить к нему callback сразу.<br>
>             Pure value -> aio $ f value<br>
>             -- Иначе мы "присоединяем" новый callback к старому.<br>
>             Send msg onMsg -> return $ Send msg (\-> onMsg s >>= f)<br>

Последнее, что нам понадобится, — это функция для проверки работоспособности, которая запустит потоки listener для обработки приходящих клиенту сообщений и tracer для вывода пришедших сообщений на сервер, и вернёт нам функцию для отправки сообщений с сервера обратно нашему клиенту. Т.е. в данном случае в качестве собеседника выступим мы сами, печатая то, что хотим отправить нашему клиенту:

> start :: IO (AState, (Int, String) -> IO ())<br>
> start = newA >>= forks where<br>
>     forks a = mapM_ forkIO [listener a, tracer a] >> return (a, writeChan (aChanIn a))<br>

Тыдыщь!

Теперь можно проверить это в интерпретаторе на исходном примере.
-- Запустим обработчики клиента и сервера, получим в кач-ве результата состояние клиента<br>
-- и функцию, которая позволит нам выступить в роли сервера<br>
ghci> (a, f) <- start<br>
-- Запускаем первый диалог<br>
ghci> run a (example 10)<br>
-- От него сразу приходит сообщение<br>
(0,"10")<br>
-- Запускаем второй диалог<br>
ghci> run a (example 20)<br>
-- Он присылает такое же сообщение, но с другим номером<br>
(1,"20")<br>
-- Ответим первому "диалогу"<br>
ghci> f (0, "11")<br>
-- Он получает ответ и шлёт нам второй запрос<br>
(2,"121")<br>
-- Теперь ответим второму "диалогу"<br>
ghci> f (1, "21")<br>
-- Получаем новый запрос<br>
(3,"441")<br>
-- А теперь пошлем ответ опять второму "диалогу", и убедимся, что первый "диалог" на это не среагирует<br>
ghci> f (3, "444")<br>
-- Получаем должный вывод<br>
465<br>
-- То же с первым "диалогом"<br>
ghci> f (2, "122")<br>
133<br>
ghci><br>

Как можно заметить, несмотря на запуск двух example сразу, диалоги с ними не пересекаются.

Кстати, всё это сообщение является программой на Literate Haskell, его можно скопировать в test.lhs и протестировать самому.

P.S. Большое спасибо pechlambda за помощь в улучшении статьи.
Теги:
Хабы:
+34
Комментарии 8
Комментарии Комментарии 8

Публикации

Истории

Ближайшие события

PG Bootcamp 2024
Дата 16 апреля
Время 09:30 – 21:00
Место
Минск Онлайн
EvaConf 2024
Дата 16 апреля
Время 11:00 – 16:00
Место
Москва Онлайн
Weekend Offer в AliExpress
Дата 20 – 21 апреля
Время 10:00 – 20:00
Место
Онлайн