Как стать автором
Обновить

Haskell. Тестируем многопоточное приложение

Время на прочтение 10 мин
Количество просмотров 14K
Данная статья составлена преподавателем Академического университета Валерием Исаевым по материалам практики по курсу функционального программирования.

Полагаю, ни для кого не секрет, что написание многопоточных приложений связано с целым рядом проблем, отсутствующих при разработке однопоточных программ.
Одна из проблем заключается в тестировании приложения.
Мы не можем контролировать порядок, в котором выполняются операции, следовательно, не поддается контролю и результат выполнения программы. Даже если мы получим ошибку, наступить на те же грабли второй раз будет не так-то просто.
Хочу предложить небольшой рецепт того, как можно протестировать многопоточное приложение.
Из ингредиентов нам понадобятся: haskell, QuickCheck, немного монад, соль/перец по вкусу.

Рабочий пример


В качестве рабочего примера возьмем задачу об обедающих философах.

MVar a – это ссылка, которая либо содержит значение типа a, либо пуста.
putMVar ref x кладет по ссылке ref значение x.
takeMVar ref считывает содержимое ссылки, оставляя ее после этого пустой.
Если она уже была пуста, то поток засыпает, пока в нее не запишет что-нибудь другой поток.
() – это тип, имеющий единственное значение, которое обозначается так же, как и сам тип – ().
Вилки мы моделируем ссылками типа MVar ().
Таким образом, у вилки может быть два состояния: если вилка занята каким-либо философом – она пуста; если вилка свободна – она содержит значение ().

import System.Random
import Control.Monad
import Control.Concurrent
import Control.Monad.Cont
import Control.Monad.Trans
import Data.IORef
import Test.QuickCheck
import Test.QuickCheck.Gen
import Test.QuickCheck.Monadic

-- sleep останавливает поток на рандомное количество секунд (от 0 до 0.3)
sleep :: IO ()
sleep = randomRIO (0, 300000) >>= threadDelay

phil
    :: Int      -- Номер философа.
    -> MVar ()  -- Ссылка на левую вилку.
    -> MVar ()  -- Ссылка на правую вилку.
    -> IO ()
phil n leftFork rightFork = forever $ do
    putStrLn $ show n ++ " is awaiting"
    sleep
    takeMVar leftFork
    putStrLn $ show n ++ " took left fork"
    -- sleep
    takeMVar rightFork
    putStrLn $ show n ++ " took right fork"
    sleep
    putMVar leftFork ()
    putMVar rightFork ()
    putStrLn $ show n ++ " put forks"
    sleep

runPhil :: Int -> IO ()
runPhil n = do

    -- Создаем ссылки, которые представляют вилки.
    forks <- replicateM n $ newMVar ()

    -- Запускаем 5 потоков, в каждом выполняем функцию phil.
    forM_ [1..n] $ \i -> forkIO $ phil i (forks !! (i - 1)) (forks !! (i `mod` n))

main = do
    runPhil 5

    -- Если главный поток завершится, программа остановится, поэтому мы его усыпляем навечно.
    forever (threadDelay 1000000000)

В этой программе может случиться дедлок.
Чтобы полюбоваться на него, можно раскомментировать строку — sleep и немного подождать.
Наша цель – написать тесты, которые бы обнаружили эту ошибку.
Но прежде чем мы сможем это сделать, стоит понять, как мы будем управлять порядком выполнения операций. Для этого, вместо IO, используем другую монаду.

Обобщим определение функций sleep, phil и runPhil, чтобы они работали и для других монад.

sleep :: MonadIO m => m ()
sleep = do
    r <- liftIO $ randomRIO (0, 100)
    r `times` liftIO (threadDelay 300)
  where
    times :: Monad m => Int -> m () -> m ()
    times r a = mapM_ (\_ -> a) [1..r]

Теперь функция sleep может работать с любой монадой, которая поддерживает IO операции. В классе MonadIO определена всего одна функция liftIO, которая позволяет это делать.
Заметим, что вместо того, чтобы один раз засыпать на рандомное число секунд, мы засыпаем рандомное число раз на 0.3 миллисекунды. Причина в том, что в нашей монаде действия внутри liftIO выполняются атомарно. Соответственно, время, на которое мы засыпаем, ни на что не влияет, важно только, сколько раз мы это делаем.

Поскольку наша монада будет работать в одном потоке, MVar для нас бесполезны, и мы позже определим свой тип ссылок, исходя из того, чтобы функция phil могла работать и с MVar, и с другими типами ссылок.
Для этого определим класс монад MonadConcurrent, в котором будут операции для создания, чтения и записи по ссылке, а также для создания потоков.

class Monad m => MonadConcurrent m where
    type CVar m :: * -> *
    newCVar :: a -> m (CVar m a)
    takeCVar :: CVar m a -> m a
    putCVar :: CVar m a -> a -> m ()
    fork :: m () -> m ()


Здесь мы использовали семейства типов, которые являются расширением языка.
В данном случае нам нужно это расширение, чтобы мы могли определять для разных монад разные типы ссылок.
Для использования расширения нужно добавить следующую строчку в начало файла (и заодно подключить расширения, которые понадобятся позже):

{-# LANGUAGE TypeFamilies, ExistentialQuantification, GeneralizedNewtypeDeriving #-}

Определим instance этого класса для монады IO.
Тут всё легко: мы просто используем соответствующие операции для MVar.

instance MonadConcurrent IO where
    type CVar IO = MVar
    newCVar = newMVar
    takeCVar = takeMVar
    putCVar = putMVar
    fork m = forkIO m >> return ()

Обобщим функции phil и runPhil.

phil :: (MonadIO m, MonadConcurrent m) => Int -> CVar m () -> CVar m () -> m ()
phil n leftFork rightFork = forever $ do
    liftIO $ putStrLn $ show n ++ " is awaiting"
    sleep
    takeCVar leftFork
    liftIO $ putStrLn $ show n ++ " took left fork"
    takeCVar rightFork
    liftIO $ putStrLn $ show n ++ " took right fork"
    sleep
    putCVar leftFork ()
    putCVar rightFork ()
    liftIO $ putStrLn $ show n ++ " put forks"
    sleep

runPhil :: (MonadIO m, MonadConcurrent m) => Int -> m ()
runPhil n = do
    forks <- replicateM n $ newCVar ()
    forM_ [1..n] $ \i -> fork $ phil i (forks !! (i - 1)) (forks !! (i `mod` n))

Запустим программу и убедимся, что она работает как прежде.

Монада Concurrent


А теперь начинается самое интересное.

Определим монаду, в которой будем работать (забегая вперед, скажу, что называется она Cont). Также рискну предположить, что Cont – одна из самых сложных и самых мощных монад одновременно.
Используя эту монаду, с потоком управления можно делать всё что угодно: например, вместо того, чтобы выполнять действия, можно их сохранить в структуре (с этой целью объявим тип Action) и выполнить их позже, возможно, в другом порядке.

data Action = Atom (IO Action)
            | forall a. ReadRef (MaybeRef a) (a -> Action)
            | forall a. WriteRef (MaybeRef a) a Action
            | Fork Action Action
            | Stop

Давайте разберемся отдельно с каждым конструктором.
Действие Stop означает, что вычисления завершились.
Действие Fork означает, что вычисления ветвятся, то есть теперь у нас есть два потока, которые могут выполняться одновременно.
Действие Atom выполняет атомарно IO операцию, возвращающую нам новый Action, в котором находится действие, что следует выполнить на следующем шаге.

Например:
Функция getSum задает действие, которое считывает два числа с клавиатуры, печатает их сумму и завершается.

getSum :: Action
getSum = Atom $ do
    x <- readLn             -- считываем первое число
    return $ Atom $ do      -- возвращаем продолжение
        y <- readLn         -- считываем второе число
        return $ Atom $ do  -- возвращаем продолжение
            print (x + y)   -- печатаем сумму
            return Stop     -- возвращаем продолжение

Далее:
Действие WriteRef ref val act записывает значение val по ссылке ref, в act находится продолжение.
Действие ReadRef ref act считывает значение по ссылке ref, act принимает это значение и возвращает продолжение.
Чтобы в Action можно было сохранять ссылки произвольных типов, мы используем еще одно расширение языка – экзистенциальную квантификацию.

Тип MaybeRef представляет тип ссылок, которые мы будем использовать вместо MVar, и определяется он как ссылка на Maybe.

newtype MaybeRef a = MaybeRef (IORef (Maybe a))

Теперь мы можем определить нашу монаду.
Как я и обещал, мы просто оборачиваем монаду Cont.

newtype Concurrent a = Concurrent (Cont Action a) deriving Monad

Монада Cont Action устроена следующим образом.
Вместо того чтобы возвращать значение типа a, она принимает продолжение типа (a -> Action), передает в эту функцию значение и возвращает результат.
То есть можно считать, что Cont Action a = (a -> Action) -> Action.
Если точнее, у нас есть следующая пара функций, которые переводят (a -> Action) -> Action в Cont Action a и обратно.

cont :: ((a -> Action) -> Action) -> Cont Action a.
runCont :: Cont Action a -> (a -> Action) -> Action

Теперь мы можем определить instance классов MonadIO и MonadConcurrent.

instance MonadIO Concurrent where
    liftIO m = Concurrent $ cont $ \c -> Atom $ do
        a <- m
        return (c a)

Давайте посмотрим, что здесь происходит.
liftIO принимает IO операцию и оборачивает ее в атомарное действие. А именно: мы в cont передаем функцию, которая принимает продолжение (то есть c имеет тип a -> Action) и возвращает атомарное действие, выполняющее IO операцию m.
Мы определили Atom так, что атомарная операция должна возвращать Action, являющийся продолжением.
Собственно это мы и делаем: после того как мы выполнили m, мы вызываем c, которое и возвращает необходимое продолжение.

Теперь определим instance MonadConcurrent.
Создаем в newCVar ссылку, используя только что определенную функцию liftIO.
В takeCVar и putCVar возвращаем соответствующее действие, а продолжение сохраняем внутри этой структуры.
В fork возвращаем действие, в котором сохранены оба потока: один передается в аргументы функции fork, другой приходит из продолжения.

instance MonadConcurrent Concurrent where
    type CVar Concurrent = MaybeRef 
    newCVar a = liftIO $ liftM MaybeRef $ newIORef (Just a)
    takeCVar v = Concurrent $ cont (ReadRef v)
    putCVar v a = Concurrent $ cont $ \c -> WriteRef v a $ c ()
    fork (Concurrent m) = Concurrent $ cont $ \c -> Fork (runCont m $ \_ -> Stop) $ c ()

Наша монада практически готова, осталось только научиться ее запускать.
Для начала напишем функцию, запускающую Action. Она принимает список действий, каждый элемент в котором – отдельный поток.
Стратегии по запуску действий могут быть различными. Определимся с двумя моментами: в каком порядке выполнять потоки, и что делать, если мы пытаемся считать значение из переменной, которая пуста. Напомню, что в переменной может ничего не лежать, и тогда нам нужно дождаться, когда другой поток в нее что-нибудь положит.
Давайте вначале напишем простую версию, где будем выполнять потоки по очереди; а поток, пытающийся считать из пустой переменной, будем перемещать в конец очереди.

runAction :: [Action] -> IO ()
    -- Если потоков не осталось, завершаемся.
runAction [] = return ()

    -- Выполняем атомарное действие, а продолжение, которое оно возвращает, кладем в конец очереди.
runAction (Atom m : as) = do
    a' <- m
    runAction $ as ++ [a']

    -- Кладем два новых потока в конец очереди.
runAction (Fork a1 a2 : as) = runAction $ as ++ [a1,a2]

    -- Продолжаем запускать остальные потоки.
runAction (Stop : as) = runAction as

runAction (ReadRef (MaybeRef ref) c : as) = do

    -- Считываем содержимое ссылки.
    ma <- readIORef ref
    case ma of

        -- Если там было что-то, то 
        Just a -> do

            -- Опустошаем содержимое ссылки.
            writeIORef ref Nothing

            -- Кладем в конец очереди продолжение.
            runAction (as ++ [c a])

        -- Если там ничего не было, то нужно попробовать считать эту ссылку позже, поэтому добавляем в конец очереди то же самое действие.
        Nothing -> runAction (as ++ [ReadRef (MaybeRef ref) c])

-- Записываем по ссылке значение, продолжением кладем в конец очереди.
runAction (WriteRef (MaybeRef ref) val a : as) = do
    writeIORef ref (Just val)
    runAction (as ++ [a])

Заметьте, что putMVar работает несколько иначе, чем наша реализация WriteRef.
Если по ссылке уже было какое-то значение, то putMVar заморозит поток, пока переменная не освободится. В этом случае перезапишем значение.
Версию, работающую как putMVar, создавать в данной ситуации не стоит, чтобы не переусложнять код.

Далее пишем функцию, запускающую Concurrent, и переопределяем main.

runConcurrent :: Concurrent () -> IO ()
runConcurrent (Concurrent c) = runAction [runCont c $ \_ -> Stop]

main = runConcurrent (runPhil 5)

Так как теперь мы работаем в одном потоке, и threadDelay останавливает всю работу, скорость немного снизилась.

Пишем тесты


Настало время «добавить в блюдо приправу» – написать тесты для нашего примера.
Для этого используем библиотеку QuickCheck, генерирующую случайные входные данные для тестов. Поскольку мы хотим запускать наши потоки в различных порядках, то входные данные для наших тестов – это порядок, в котором мы выбираем очередной поток из списка.
Можно закодировать входные данные списком чисел, но проблема в том, что мы не знаем заранее, из какого диапазона следует выбирать эти числа, так как число потоков может меняться.
Поэтому кодировать входные данные мы будем списком функций типа Int -> Int, которые принимают число n и возвращают число из интервала [0,n-1].

newtype Route = Route [Int -> Int]

Класс Arbitrary, предоставляемый библиотекой QuickCheck, предназначен для описания типов, позволяющих генерировать элементы случайным образом.
В этом классе объявлено две функции — shrink и arbitrary.
У shrink есть реализация по умолчанию, так что переопределять ее не будем.
В функции arbitrary сгенерируем список случайных функций, где каждая функция возвращает число из интервала [0,n-1].

instance Arbitrary Route where
    arbitrary = fmap Route (listOf arbitraryFun)
      where
        arbitraryFun = MkGen $ \q s n -> unGen (choose (0, n - 1)) q s

Определяем также instance Show для Route, поскольку этого требует QuickCheck.
К сожалению, слишком полезный show мы написать не можем. Более того, эта функция использоваться не будет, поэтому мы оставляем ее неопределенной.

instance Show Route where
    show = undefined

Теперь можно приступить к написанию более умной версии runAction.
Первое отличие заключается в том, что мы разделим выполнение атомарных действий и работу с ссылками.
Для начала напишем вспомогательную функцию skipAtoms, выполняющую атомарные действия: функция принимает список действий, выполняет Atom, Fork и Stop, остальные возвращает в качестве результата.

skipAtoms :: [Action] -> IO [Action]
skipAtoms [] = return []
skipAtoms (Atom m : as) = do
    a <- m
    skipAtoms (as ++ [a])
skipAtoms (Fork a1 a2 : as) = skipAtoms (as ++ [a1,a2])
skipAtoms (Stop : as) = skipAtoms as
skipAtoms (a : as) = fmap (a:) (skipAtoms as)

Второе отличие новой версии runAction от прежней заключается в том, что мы отслеживаем получение дедлока.
Для этого заводим два списка действий. В первом хранятся активные (выполняемые нами) потоки. Во втором – потоки, ждущие обновления какой-либо ссылки.
Если список активных потоков пуст, а списка ждущих нет, значит, мы получили дедлок, и в этом случае бросаем исключение.

Третье нововведение – аргумент типа Route, используемый для выбора номера потока, который следует выполнить на текущем шаге.

runAction :: Route -> [Action] -> [Action] -> IO ()
runAction _ [] [] = return ()
runAction _ [] _ = fail "Deadlock"
runAction (Route []) _ _ = return ()
runAction (Route (r:rs)) as bs = do
    as <- skipAtoms as
    let n = length as
    case splitAt (r n) as of
        (as1, ReadRef (MaybeRef ref) c : as2) -> do
            ma <- readIORef ref
            case ma of
                Just a -> do
                    writeIORef ref Nothing
                    runAction (Route rs) (as1 ++ [c a] ++ as2) bs
                Nothing -> runAction (Route rs) (as1 ++ as2) (bs ++ [ReadRef (MaybeRef ref) c])
        (as1, WriteRef (MaybeRef ref) x c : as2) -> do
            writeIORef ref (Just x)
            runAction (Route rs) (as1 ++ [c] ++ as2 ++ bs) []

Функция runConcurrent практически не поменялась.

runConcurrent :: Route -> Concurrent () -> IO ()
runConcurrent r (Concurrent c) = runAction r [runCont c $ \_ -> Stop] []

Можно проверить, как работает новая версия, передав в качестве первого аргумента round_robin. Это простая стратегия выполнения, аналогичная тому, как функция runAction работала раньше. Здесь мы просто генерируем бесконечный список и для каждого элемента берем остаток по модулю числа потоков.

round_robin :: Route
round_robin = Route $ map rem [0..]

Запустив вычисления на этих входных данных, мы, скорее всего, быстро получим дедлок – по причине того, что работа нашего примера построена на основе генератора случайных чисел – следовательно, не смотря на то, что входные данные всегда одни и те же, порядок выполнения оказывается случайным.
Если бы наш пример был более детерминирован, нам пришлось бы варьировать входные данные случайным образом, что мы сейчас и сделаем.

main = quickCheck $ monadicIO $ do
    r <- pick arbitrary
    run $ runConcurrent r (runPhil 5)

Мы выбираем произвольный элемент типа Route, используя реализованную нами ранее функцию arbitrary. После чего запускаем наше вычисление на этом входе.
Об остальном позаботится QuickCheck, а именно: запустит наш тест 100 раз, с каждым разом увеличивая размер входных данных.

Запустив программу, мы увидим следующее:

...
3 took left fork
4 put forks
4 is awaiting
5 took left fork
4 took left fork
1 took right fork
1 put forks
1 is awaiting
1 took left fork
2 took left fork
*** Failed! Exception: 'user error (Deadlock)' (after 36 tests):

Что и требовалось получить!

Заключение


Мы научились писать тесты, которые могут обнаруживать состояние дедлока в многопоточном приложении.
В процессе мы видели примеры использования монады Cont, семейств типов, экзистенциальной квантификации и библиотеки QuickCheck.
Кроме того, мы узнали, как можно собрать модель многопоточного выполнения программы из подручных материалов.
Теги:
Хабы:
+39
Комментарии 19
Комментарии Комментарии 19

Публикации

Информация

Сайт
www.jetbrains.com
Дата регистрации
Дата основания
Численность
501–1 000 человек
Местоположение
Россия

Истории