Реактивное программирование со Spring Boot 2. Часть 1


    Не так давно вышла новая версия самого популярного фреймворка на Java: Spring Framework 5. Новая версия принесла много нового. Одно из самых больших нововведений — модель реактивного программирования. Совсем скоро выйдет Spring Boot 2, который существенно упростит создание микросервисов c данным подходом.

    Если вы, как и я, хотите разобраться подробнее, что это такое и как это используется, то добро пожаловать под кат. Статья делится на две части — теоретическую и практическую. Сейчас мы постараемся разобраться, что значит быть реактивным. После чего попробуем использовать полученные знания для написания собственного микросервиса(часть 2).

    Что такое реактивность?


    Для начала рассмотрим понятие реактивности. И тут нужно сделать сразу четкое разраничение в определениях.

    Реактивная система


    Реактивная система — архитектурный паттерн, который удовлетворяет некоторому набору правил(reactive manifesto). Данный монифест был разработан в 2013 году для устранения неопределенности. Дело в том, что на тот момент в Европе и США термин «reactive» являлся слишком избыточным. Каждый понимал по-своему, какую систему можно назвать реактивной. Это рождало огромную путаницу, и в итоге был создан манифест, который устанавливает четкие критерии реактивной системы.

    Посмотрим на картинку из манифеста и разберем более подробно, что означает каждый пункт:

    image

    • Responsive. Данный принцип говорит нам о том, что разрабатываемая система должна отвечать быстро и за определенное заранее заданное время. Кроме того система должна быть достаточно гибкой для самодиагностики и починки.

      Что это значит на практикте? Традиционно при запросе некоторого сервиса мы идем в базу данных, вынимаем необходимый объем информации и отдаем ее пользователю. Здесь все хорошо, если наша система достаточно быстрая и база данных не очень большая. Но что, если время формирования ответа гораздно больше ожидаемого? Кроме того, у пользователя мог пропасть интернет на несколько миллисекунд. Тогда все усилия по выборке данных и формированию ответа пропадают. Вспомните gmail или facebook. Когда у вас плохой интернет, вы не получаете ошибку, а просто ждете результат больше обычного. Кроме того, этот пункт говорит нам о том, что ответы и запросы должны быть упорядочены и последовательны.
    • Resilient. Система остается в рабочем состоянии даже, если один из компонентов отказал. Другими словами, компоненты нашей системы должны быть досточно гибкими и изолированными друг от друга. Достигается это путем репликаций. Если, например, одна реплика PostgreSQL отказала, необходимо сделать так, чтобы всегда была доступна другая. Кроме того, наше приложение должно работать во множестве экземпляров.
    • Elastic. Данный принцип говорит о том, что система должна занимать оптимальное количество ресурсов в каждый промежуток времени. Если у нас высокая нагрузка, то необходимо увеличить количество экзепляров приложения. В случае малой нагрузки ресурсы свободных машин должны быть очищены. Типичный инструменты реализации данного принципа: Kubernetes.
    • Message Driven. Здесь начинается наиболее важный пункт для Java-разработчика. Именно этим вопросом должно озаботиться наше приложение. Общение между сервисами должно происходить через асинхронные сообщения. Это значит, что каждый элемент системы запрашивает информацию из другого элемента, но не ожидает получение результата сразу же. Вместо этого он продолжает выполняеть свои задачи. Это позволяет увеличить пользу от системных ресурсов и управлять более гибко возникающими ошибками. Обычно такой результат достигается через реактивное программирование.

    Реактивное программирование


    Если верить википедии, то реактивное программирование — парадигма программирования, ориентированная на потоки данных. Очень скоро мы разберем, как это работает на практике. Но вначале посмотрим, на чем основана данная парадигма.

    Основная концепция реактивного программирования базируется на неблокирующем вводе/ввыоде. Обычно при обращении к некоторому ресурсу(базе данных, файле на диске, удаленному серверу и т.д.) мы получаем результат сразу же(часто в той же строчке). При неблокирующем обращении к ресурсу наш поток не останавливается на обращении и продолжает выполнение. Результат мы получаем позже и при необходимости.

    Практика


    Отлично! Теперь приступим к реализации реактивного программирования в Java. Единственное, следует заметить, что мы будем использовать Spring WebFlux. Это новый фреймворк для реактивного программирования. Возникает вопрос, почему команда Spring не использовала для этих целей Spring Web MVC? Дело в том, что далеко не все модули в этом фреймворке можно использовать для работы в реактивном режиме. Остается много кода и сторонних библитек, например, Tomcat, которые основаны на декларативном программировании и потоках.

    В процессе работы над фреймворком была разработана небольшая спецификация для асинхронной работы. В дальнейшем эту спецификацию решили включить в Java 9. Однако я буду использовать Java 8 и Spring Boot 2 для простоты.

    Основные концепции


    В новом подходе у нас есть два основных класса для работы в реактивном режиме:

    • Mono
      Класс Mono нужен для работы с единственным объектом. Давайте посмотрим, как будет выглядеть простое приложение с использованием Mono. Для этого создадим проект и сущность User в нем(все настройки и примеры можно найти на моем профиле в github):

      @Data
      @NoArgsConstructor
      @AllArgsConstructor
      public class User {
          private String firstName;
          private String lastName;
      }

      Далее создадим класс с тестами и подготовленными пользователями:

      public class HabrreactiveApplicationTests {
      
      	private User peter = new User("Peter", "Griffin");
      	private User lois = new User("Lois", "Griffin");
      	private User brain = new User("Brain", "Griffin");
      }

      Напишем тест:
      	@Test
      	public void mono() {
      		// Создаем объект
      		Mono<User> monoPeter = Mono.just(peter);
      
      		// Блокируем текущий поток до тех пор пока не получим объект
      		User peter2 = monoPeter.block();
      
      		// Проверяем, что мы получили ожидаемый объект
      		assertEquals(peter, peter2);
      	}

      Как видно из примера, использовать реактивный подход довольно просто.

      Кроме того, у класса Mono есть множество методов на любой случай жизни. Например, есть всем известный метод map для преобразования одного типа в другой:

      	@Test
      	public void blockMono() {
      		Mono<User> monoPeter = Mono.just(peter);
      
      		// Блокируем текущий поток до тех пока мы не получим и не обработаем данные
      		String name = monoPeter.map(User::getFirstName).block();
      		assertEquals(name, "Peter");
      	}

    • Flux

      Данный класс схож с Mono, но предоставляет возможность асинхронной работы со множеством объектов:

      	@Test
      	public void flux() {
      		// Создаем поток данных для выгрузки наших
      		Flux<User> fluxUsers = Flux.just(peter, lois, brain);
      
      		// Получаем данные и обрабатываем по мере поступления
      		fluxUsers.subscribe(System.out::println);
      	}

      Как и в случае с Mono у Flux есть набор полезных методов:
      	@Test
      	public void fluxFilter() {
      		Flux<User> userFlux = Flux.just(peter, lois, brain);
      
      		// Фильтруем и оставляем одного Питера
      		userFlux
      				.filter(user -> user.getFirstName().equals("Peter"))
      				.subscribe(user -> assertEquals(user, peter));
      	}
      
      	@Test
      	public void fluxMap() {
      		Flux<User> userFlux = Flux.just(peter, lois, brain);
      
      		// Преобразуем тип User в String
      		userFlux
      				.map(User::getFirstName)
      				.subscribe(System.out::println);
      	}

      Здесь следует подчеркнуть одну особенность. В отличае от стандартных(не демонов) потоков при завершении работы основного потока выполнения сбор наших данных останавливается, и программа завершается. Это можно легко продемострировать. Следующий код ничего не выведет на консоль:

             @Test
      	public void fluxDelayElements() {
      		Flux<User> userFlux = Flux.just(peter, lois, brain);
      
      		// Ожидаем получение данных 1 секунду и только после этого производим обработку событий
      		userFlux.delayElements(Duration.ofSeconds(1))
      				.subscribe(System.out::println);
      	}

      Этого можно избежать с помощью класса CountDownLatch:
      	@Test
      	public void fluxDelayElementsCountDownLatch() throws Exception {
      		// Создаем счечик и заводим его на единицу
      		CountDownLatch countDownLatch = new CountDownLatch(1);
      
      		Flux<User> userFlux = Flux.just(peter, lois, brain);
      
      		// Запускаем userFlux со срабатыванием по прошествию одной секунды
      		// и устанавлием сбрасывание счетчика при завершении
      		userFlux
      				.delayElements(Duration.ofSeconds(1))
      				.doOnComplete(countDownLatch::countDown)
      				.subscribe(System.out::println); // вывод каждую секунду
      
      		// Ожидаем сброса счетчика
      		countDownLatch.await();
      	}


    Все это очень просто и эффективно по ресурсам. Представьте, чего можно достить при комбинировании вызовов методов стрима.

    В данной статье мы рассмотрели понятие рективностивной системы и реактивного программирования. Кроме того, мы поняли, как связаны эти понятия. В следующей части мы пойдем дальше и попробуем построить свой сервис на основе полученных знаний.

    P.S. Предлагаю разобрать систему обмена сообщениями от mail.ru. Как вы считаете, такое приложение можно назвать системой сообщений согласно манифесту? Пишите свои мысли в комментариях. Очень интересно.
    • +10
    • 31.1k
    • 7
    Share post
    AdBlock has stolen the banner, but banners are not teeth — they will be back

    More
    Ads

    Comments 7

      0

      Спасибо за статью! Хотя я не очень понимаю целевую аудиторию — совсем новичкам в реактивном программировании она ничего не объяснит, а людям, знакомым с RxJava или Reactor ничего нового не даст.


      Пара вещей, которые стоит отметить, говоря про реактивный Spring:


      • Spring MVC не использован т.к. он построен на спецификации сервлетов, которые (пока) не особо реактивные
      • Помимо WebFlux реактивный Spring включает еще и реактивный доступ к данным (Mongo, Redis), безопасность (Spring Reactive Security) — ведь если все контроллеры реактивные, но доступ к базе блокирующий — то смысла в реактивности особо нет

      И важный момент насчет тестов — тестирование кода фреймворка или вообще любого внешнего кода это не всегда хорошая идея. Хотя в статье это иллюстрация работы Mono / Flux, но в целом тестировать функции just и map не нужно вне контекста вашего кода.

        0
        Она для людей, которые работают со Spring Boot, но пока не видели Spring Boot 2.
        0
        "// Блокируем текущий поток до тех пор пока не получим объект"

        ничего, что это чуть ли не главная проблема, которую реактивный подход пытается решить? :) а вы прямо под этим примером гордо пишете «использовать реактивный подход довольно просто»
          –1
          Мне кажется, для первого примера неплохо. :)
          0
          Это все здорово, но к сожалению пока Spring Data не умеет работать асинхронно с JPA/jdbc. И это во многих случаях делает бессмысленными все дальнейшие реактивные манипуляции.
            +1
            Пока в первой статье непонятно, чем это лучше Stream
              0

              Я правильно понимаю, что это для микросервисной архитектуры? То есть пока у нас была БД, как поставщик данных, то она редко была нетерпимо медленной, если же была, то выставлялся кэш в памяти. Теперь данные идут от микросервисов, которые могут быть довольно тормознутыми. Чтоб не городить потоки, делают более простой способ организации параллельной работы — асинхронный, он же реактивный.

              Only users with full accounts can post comments. Log in, please.