Тестирование многопоточного и асинхронного кода

Автор оригинала: Jonathan Halterman
  • Перевод
  • Tutorial
Привет! На неделе встала задача написать интеграционный тест для Spring Boot приложения, использующего асинхронное взаимодействие с внешними системами. Освежил много материала про отладку многопоточного кода. Привлекла внимание статья «Testing Multi-Threaded and Asynchronous Code» by Jonathan Halterman, мой перевод которой приведен ниже.

Благодарю shalomman, schroeder и FTOH за важнейшие замечания к коду из оригинальной статьи.

Если вы пишете код достаточно долго а может даже и нет, то, вероятно, столкнулись со сценарием, в котором нужно протестировать многопоточный код. Обычно считается, что потоки и тесты не должны смешиваться. Обычно это получается, т.к. то, что подлежит тестированию, как раз запускается внутри многопоточной системы и может быть протестировано индивидуально без использования потоков. Но что делать, если вы не можете их разделить или более того, если многопоточность — это тот аспект кода, который вы тестируете?

Я здесь, чтобы сказать вам, что, хотя потоки в тестах не сильно распространены, но вполне используются. Программная полиция не арестует вас за запуск потока в модульном тесте, хотя, как на самом деле тестировать многопоточный код — это другой вопрос. Некоторые превосходные асинхронные технологии, такие как Akka и Vert.x, предоставляют тестовые наборы для облегчения этого бремени. Но помимо этого, тестирование многопоточного кода обычно требует иного подхода, чем типичный синхронный модульный тест.

Идем параллельно


Первым шагом является запуск любого многопоточного действия, для которого требуется проверить результат. Например, давайте использовать гипотетический API для регистрации обработчика сообщений на шине сообщений и публикации на шине сообщения, которое будет доставлено нашему обработчику асинхронно в отдельном потоке:

messageBus.registerHandler(message - > {
    System.out.println("Received " + message);
});
messageBus.publish("test");

Выглядит неплохо. Когда запускается тест, шина должна доставить наше сообщение обработчику в другом потоке, но это не очень полезно, так как мы ничего не проверяем. Давайте обновим наш тест, чтобы подтвердить, что шина сообщений доставляет наше сообщение так, как ожидалось:

String msg = "test";
messageBus.registerHandler(message -> {
  System.out.println("Received " + message);
  assertEquals(message, msg);
};
messageBus.publish(msg);

Выглядит лучше. Запускаем наш тест и он зеленый. Круто! Но сообщение Received нигде не напечаталось, что-то где-то неправильно.

Подождите секундочку


В тесте выше, когда сообщение публикуется на шине сообщений, оно доставляется шиной обработчику в другом потоке. Но когда инструмент модульного тестирования, такой как JUnit, выполняет тест, он ничего не знает о потоках шины сообщений. JUnit знает только о главном потоке, в котором он выполняет тест. Таким образом, пока шина сообщений занята, пытаясь доставить сообщение, тест завершает выполнение в основном тестовом потоке и JUnit сообщает об успехе. Как это решить? Нам нужно, чтобы основной тестовый поток ждал, пока шина сообщений доставит наше сообщение. Поэтому давайте добавим оператор sleep:

String msg = "test";
messageBus.registerHandler(message -> {
  System.out.println("Received " + message);
  assertEquals(message, msg);
};
messageBus.publish(msg);
Thread.sleep(1000);

Наш тест зеленый и выражение Received печатается, как и ожидалось. Круто! Но одна секунда сна означает, что наш тест выполняется по меньшей мере одну секунду, и в этом нет ничего хорошего. Мы могли бы уменьшить время сна, но тогда мы рискуем закончить тест до получения сообщения. Нам нужен способ координации между основным тестовым потоком и потоком обработчика сообщений. Глядя на пакет java.util.concurrent, мы обязательно найдем то, что можем использовать. Как насчет CountDownLatch?

String msg = "test";
CountDownLatch latch = new CountDownLatch(1);
messageBus.registerHandler(message -> {
  System.out.println("Received " + message);
  assertEquals(message, msg);
  latch.countDown();
};
messageBus.publish(msg);
latch.await();

В этом подходе мы разделяем (share) CountDownLatch между основным тестовым потоком и потоком обработчика сообщений. Основной поток вынужден ждать на блокере. Тестовый поток освобождает ожидающий основной поток, вызывая countDown() на блокере после получения сообщения. Нам больше не нужно спать одну секунду. Наш тест занимает ровно столько времени, сколько нужно.

Так устраивает?


С нашей новой прелестью CountDownLatch мы начинаем писать многопоточные тесты, как последние модницы. Но довольно быстро мы замечаем, что один из наших тест-кейсов блокируется навсегда и не завершается. Что же происходит? Рассмотрим сценарий шины сообщений: блокер заставляет ждать, но он освобождается только после получения сообщения. Если шина не работает и сообщение никогда не будет доставлено, то тест никогда не завершится. Поэтому давайте добавим таймаут к блокеру:

latch.await(1, TimeUnit.SECONDS);

Тест, который блокировался, завершается неуспешно через 1 секунду с исключением TimeoutException. В конце концов мы найдем проблему и исправим тест, но решаем оставить тайм-ауты на месте. В случае, если это когда-нибудь повторится, мы предпочли бы, чтобы наш тест заблокировался на секунду и упал, чем заблокировался навсегда и не был завершен вообще.
Еще одна проблема, которую мы замечаем при написании тестов, заключается в том, что все они, похоже, проходят даже тогда, когда они, вероятно, не должны этого делать. Как это возможно? Рассмотрим тест обработки сообщений еще раз:

messageBus.registerHandler(message -> {
  assertEquals(message, msg);
  latch.countDown();
};

Мы должны были использовать CountDownLatch для координации завершения нашего теста с основным тестовым потоком, но как насчет проверок (asserts)? Если проверка не удастся, узнает ли об этом JUnit? Оказывается, поскольку мы не выполняем проверку в основном тестовом потоке, любые зафейленные проверки остаются полностью незамеченными JUnit. Давайте попробуем небольшой сценарий, чтобы проверить это:

CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
  assertTrue(false);
  latch.countDown();
}).start();
latch.await();

Тест зеленый! Так что нам теперь делать? Нам нужен способ передачи любых ошибок тестирования из потока обработчика сообщений обратно в основной тестовый поток. Если в потоке обработчика сообщений происходит сбой, нам нужно, чтобы он повторно появился в основном потоке, чтобы тест зафейлился, как и ожидалось. Давайте попробуем это сделать:

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<AssertionError> failure = new AtomicReference<>();
new Thread(() -> {
  try {
    assertTrue(false);
  } catch (AssertionError e) {
    failure.set(e);
  }
  latch.countDown();
}).start();
latch.await();
if (failure.get() != null)
  throw failure.get();

Быстрый запуск и да, тест не проходит, как и должен! Теперь мы можем вернуться и добавить CountDownLatches, блоки try/catch и AtomicReference ко всем нашим тестовым случаям. Круто! На самом деле, не круто, выглядит как бойлерплейт.

Вырезаем хлам


В идеале нам нужен API, который позволяет нам координировать ожидание, проверку и возобновление выполнения между потоками, чтобы модульные тесты могли проходить или фейлиться, как ожидалось, независимо от того, где происходит сбой проверки. К счастью, ConcurrentUnit предоставляет облегченную структуру, которая делает именно это: Waiter. Давайте адаптируем тест обработки сообщений выше в последний раз и посмотрим, что Waiter из ConcurrentUnit может сделать для нас:

String msg = "test";
Waiter waiter = new Waiter();
messageBus.registerHandler(message -> {
  waiter.assertEquals(message, msg);
  waiter.resume();
};
messageBus.publish(msg);
waiter.await(1, TimeUnit.SECONDS);

В этом тесте мы видим, что Waiter занял место нашего CountDownLatch и AtomicReference. С помощью Waiter мы блокируем основной тестовый поток, выполняем проверку, затем возобновляем основной тестовый поток, чтобы тест мог завершиться. Если проверка фейлится, то вызов waiter.await автоматически снимет блокировку и выбросит сбой, что приведет к тому, что тест пройдет или зафейлится, как должен, даже если проверка осуществлялась из другого потока.

Еще более параллельно


Теперь, когда мы стали сертифицированными многопоточными тестерами, мы можем захотеть подтвердить, что происходит несколько асинхронных действий. Waiter из ConcurrentUnit делает это просто:

Waiter waiter = new Waiter();
messageBus.registerHandler(message -> {
  waiter.resume();
};
messageBus.publish("one");
messageBus.publish("two");
waiter.await(1, TimeUnit.SECONDS, 2);

Здесь мы публикуем два сообщения в шину и проверяем, что оба сообщения доставлены, заставляя Waiter ждать, пока resume() не будет вызван 2 раза. Если сообщения не доставляются и resume не вызывается дважды в течение 1 секунды, то тест зафейлится с ошибкой TimeoutException.
Один общий совет с этим подходом заключается в том, чтобы убедиться, что ваши тайм-ауты достаточно долгие для завершения любых параллельных действий. В нормальных условиях, когда тестируемая система работает так, как ожидалось, тайм-аут не имеет значения и вступает в действие только в случае отказа системы по какой-либо причине.

Резюме


В этой статье мы узнали, что многопоточное модульное тестирование не является злом и его довольно легко провести. Мы узнали об общем подходе, когда мы блокируем основной тестовый поток, выполняем проверки из некоторых других потоков, а затем возобновляем основной поток. И мы узнали о ConcurrentUnit, который может облегчить эту задачу.
Счастливого тестирования!

Перевод выполнен @middle_java
Поделиться публикацией
AdBlock похитил этот баннер, но баннеры не зубы — отрастут

Подробнее
Реклама

Комментарии 17

    –3

    Что-то я не понял. Waiter это стандартный объект в JUnit? К чему тогда 90% этого текста? Предполагается что читатель про синхронизацию вообще в первый раз слышит?

      0
      Нет. Это ConcurrentUnit.
      0
      >не является злом и его довольно легко провести
      Во-первых, скорее всего у вас в потоках уже есть таймауты (или просто длительная обработка — а иначе и потоки обычно не нужны), и когда вы добавляете еще таймаутов в тестах, будет очень сложно выбрать значения для них. Ложные срабатывания, тесты то зеленые, то красные, в зависимости от погоды.
      Во-вторых, это частное решение не решает одну существенную проблему — он не позволяет строить и проверять предположения о том, в каком порядке происходили события в асинхронной системе. И происходили ли вообще. А при тестировании многопоточной системы это может быть довольно важно.

      Ну то есть, чуть легче-то будет, да. Но легко не станет нифига.
        –3

        "избегайте асинхронных схем" ;)

          0
          Ну почему-же — для UI асинхронность очень полезна, причем в сложных случаях именно ручное управление потоками а не асинхронные вызовы: так быстрее и понятнее.
            0

            ;)
            это отсылка к основному правилу радиотехники, озвученному еще в христоматии Хоровица
            На самом деле, все что вы описываете — это синхронные процессы) которые выполняются последовательно-параллельно и управляются через семафоры или обратные вызовы или другие механизмы обеспечивающие синхронность) Я просто из другой отрасли) и меня улыбает слушать про асинхронные процессы. То что у вас описано, не в полной мере асинхронный процесс.
            Для примера приведу вариант асинхронного процесса в программировании. Представьте, чтоту вас есть функция(поток), которая ищет в некоторой области памяти максимальное значение (в массиве) как только что-то в памяти изменяется, начинает меняться знаяение на выходе функции (в другой области памяти, например, записывается максимум). Вы не можете никак влиять на работу потока, вы только можете знать его архитектуру и максимальное время через которое результат будет достоверен, хотя она может найти результат и раньше. Вопрос, как тестировать эту функцию(поток)?
            Как только вы введете семафор который будет запускать поиск или сообщать о завершении поиска, поток станет синхронный.
            И да, основное преимущество асинхронных схем — это очень высокая скорость, выше чем у синхронных, но… все равно, избегайте асинхронных схем.

              0
              Вы хотите сказать что мои потоки не синхронны а параллельны?
              Но позвольте — вот я из графического потока запускаю поток, который ожидает API вызовов от скрипта сам же их и исполняет, максимум как взаимодействует с основным потоком — через механизм отложенных действий просит что-нибудь отрисовать или считать. И такой поток — не один и не забывайте про потоки скриптов. Если потоки не касаются вычислений — они могут быть асинхронными.
              Или вы про то что все может быть на одноядерной машине — так любые сущности можно виртуализировать.
              Если хотите чтобы я вас понял — приведите определение асинхронного потока.
                +1

                Наоборот, я хочу сказать, что то, что вы описываете это синхронные процессы, вы их синхронизируете тем или иным образом с основным потоком. Запускаемые вами тесты параллельные, но ни асинхронные, как описано в заголовке статьи.
                Если смотреть с точки зрения "основного правила" в статье классический способ избегания асинхронности — синхронизация.
                Опредерение асинхронности в программировании: не блокируемость основого потока, но в примере основной поток блокируется и ждет пока все тесты выполнятся.
                Если вы что-то запустили и оно выполнилось и сообщило вам — это параллельность, но не асинхронность.
                Для примера, еще один вариант классической асинхронности — это нейронная сеть охваченнвя обратной связью, на входе у нее набор данных, как только данные меняются, сеть начинает ее пересчитывать и через некоторое время распространения начнут меняться данные на выходе, но за счет обратной связи вы не знаете, когда завершится изменение данных на выходе и завершиться ли вообще… Это будет пример асинхронного процесса.
                Представьте, что у вас есть возможность запустить на GPU N таких сетей и тестировать. Это будет поточное тестирование асинхронных кодов ))))
                Ладно, извиняюсь, что глубоко копнул, просто то, что пытаются обычно приподнести в качестве асинхронности, на проверку оказывается синхронным процессом и потоком.
                У вас процесс тестирования многопоточный, но синхронный, по моему мнению.

                  0
                  Спасибо, очень интересно (у меня правда не тесты а просто ПО, но все равно) собственно, системные вызовы называются асинхронными — я к тому, что не мы (разрабы) придумали использовать термин асинхронность и тестировать все-равно такие вещи сложно: если потоки основного приложения еще в .NET Framework и могут быть запущены под отладчиком студии, то скрипты-то это отдельные процессы на собственных интерпретаторах.
          0

          Почему в последнем примере waiter будет ждать двух вызовов?

            +5

            пока ждал ответа, почитал официальную документацию.
            приведенный пример не правильный. Вот пример из документации, который сработает.


            @Test
            public void shouldDeliverMessages() throws Throwable {
              final Waiter waiter = new Waiter();
            
              messageBus.registerHandler(message -> {
                waiter.assertEquals(message, "foo");
                waiter.resume();
              };
            
              messageBus.send("foo");
              messageBus.send("foo");
              messageBus.send("foo");
            
              // Wait for resume() to be called 3 times
              waiter.await(1000, 3);
            }

            дополнено:


            А вот пример из оригинальной статьи, токорый тоже будет работать.


            Waiter waiter = new Waiter();
            messageBus.registerHandler(message -> {
              waiter.resume();
            };
            messageBus.publish("one");
            messageBus.publish("two");
            waiter.await(1, TimeUnit.SECONDS, 2);
              0
              shalomman, благодарю за разбор! Заменил оригинальный код вашим кодом. Оригинал у автора сегодня также был исправлен.
            0
            .
              0
              Я надеялся увидеть как юнит тестом race condition поймать, либо подобное.
                +1

                Этот Waiter по поведению очень похож на обычное CompletableFuture

                  0

                  Вот как раз то же самое хотел написать. Сам использую CompletableFuture в подобных случаях, при тестировании асинхронного AMQP.

                  –1
                  Имхо этот пример из какой то далекой далекой галактики, в реальном приложении реальные обработчики, и никто не будет заменять их тестовыми, ибо становиться непонятно что мы тут тестируем. У реальный обработчиков что то да можно замокать, соответственно можно убедиться что сообщение получено. В случае облаков и(или) хайлодов вы скорее всего используете метрики, и Вам не составит труда вытащить из MeterRegistry значения до и после, ничего не мокая. Но подождать в любом случае прийдеться.

                  Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                  Самое читаемое