Привет, Хабр! На связи Андрей Зяблин, Java разработчик компании «Магнит». Несколько лет назад, продляя дочери подписку на Netflix, я заинтересовался, как они поддерживают обслуживание сотен миллионов клиентов. Изучив вопрос, я увлёкся реактивным подходом в программировании. Через некоторое время это увлечение принесло практическую пользу при разработке сервисов в «Магните». В статье я расскажу про три решения, которые позволяют реализовать потоковый обмен данными из БД между распределёнными приложениями:
Реализация с использованием hibernate
Реализация с использованием mybatis
Ограничение скорости обмена с использованием механизма обратного давления («backpressure») и библиотеки Bucket4j
Постановка задачи
Наша команда разрабатывала приложение для интеграции различных сервисов сети, а также для интеграции с государственными сервисами в области ветеринарии «ВетИС».
Упрощенно структура приложения выглядит так:
Бизнес‑запросы — запросы в формате json, предназначенные для обмена на уровне бизнес‑данных (операции, позиции операций, пр.) и раскладывающиеся в более низкоуровневые запросы в форматах, поддерживающихся госсервисами.
ЦСМ (Централизованный сервис сети «Меркурий») — разрабатываемое нами приложение интеграции.
BDSM‑CLIENT — работа с бизнес‑запросами БД СМ (базы данных сети магазинов).
MERCURY — обеспечивает взаимодействие со шлюзом к «ВетИС».
PROCESSING — выполняет обработку бизнес‑запросов до отправки запросов в шлюз.
Проблемным местом оказалась передача данных из ЦСМ в БД СМ: объёмы передаваемых данных огромны, а на стороне БД СМ напрашивался тонкий клиент, пишущий непосредственно в базу.
Требования к обмену:
Данные извлекаются из БД
Объём данных — миллионы записей за сеанс
Передача ведётся через интернет по http
Необходимо использовать ORM
Изучив различные технологии, мы пришли к мнению, что в нашем случае оптимально использовать реактивность.
Доработки на стороне ЦСМ и БД СМ:
В синхронный код ЦСМ встраивается реактивный код обмена
Для приёма данных на стороне БД СМ пишется отдельное реактивное приложение
Используемые технологии:
Java streams
Hibernate streamResult
Reactive streams
Spring webflux(reactor)
Spring webclient
Rsocket (для демонстрации)
Граничные условия:
Не всегда имеется возможность использовать R2DBC. На момент разработки для Oracle не было стабильного реактивного драйвера.
Hibernate streamResult требует активную транзакцию на всё время фетча. Нужно отслеживать завершение фетча, ошибки, закрытие соединения, чтобы завершить транзакцию. Стандартные решения в виде try‑catch не подходят в данной ситуации.
Решение с использованием hibernate
Основная сложность реализации заключалась в корректной работе с транзакциями в асинхронном режиме. Для этого пришлось создавать отдельный механизм, благо Reactor это позволяет. Основу решения составляют:
Stateless session
StreamResult
Генерация Flux из Stream
Далее рассмотрим код.
Сущности
Customer — справочник заказчиков.
Operation — операции с заказчиками, например, продажи.
public class Customer {
@Id
@GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "sequence")
@SequenceGenerator(name = "sequence", sequenceName = "SQ_CUSTOMER", allocationSize = 1)
private Long id;
@Column(name = "NAME")
private String name;
}
public class Operation {
@Id
@GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "sequence")
@SequenceGenerator(name = "sequence", sequenceName = "SQ_OPERATION", allocationSize = 1)
private Long id;
@Column(name = "TOTAL_SUM")
private Long totalSum;
@ManyToOne
@JoinColumn(name = "customer_id")
private Customer customer;
@ManyToMany
@JoinTable(
name = "operation_detail",
joinColumns = @JoinColumn(name = "operation_id"),
inverseJoinColumns = @JoinColumn(name = "product_id"))
private List<Product> products;
}
Загрузка данных
Загрузка данных достаточно проста и не совсем имеет отношение к теме статьи, здесь демонстрируется мощь функционального программирования. На данном примере показано, насколько функциональна библиотека reactor и как она может облегчить разработку.
Класс com.magnit.flux.common.service.InitDataService
private void fillCustomer() {
Flux.range(0, CUSTOMER_RECORD_LIMIT)
.map(i -> Customer.builder().name("Customer " + i).build())
.subscribe(customerRepository::save);
}
private void fillProduct() {
Flux.range(0, PRODUCT_RECORD_LIMIT)
.map(i -> Product.builder().name("Product " + i).build())
.subscribe(productRepository::save);
}
private void fillOperation() {
Supplier<List<Product>> productSupplier = () -> new Random().ints(5, 1, 8)
.boxed().distinct()
.map(productId -> Product.builder().id((long) productId).build())
.collect(Collectors.toList());
List<Customer> customers = (List<Customer>) customerRepository.findAll();
Flux.range(0, OPERATION_RECORD_LIMIT)
.map(i -> Operation.builder()
.totalSum((long) random.nextInt(100))
.customer(customers.get(random.nextInt(customers.size() - 1)))
.products(productSupplier.get())
.build())
.buffer(1000)
.subscribe(operationRepository::saveAll);
}
Использование hibernate для стриминга
Класс выполняет основную работу по управлению транзакциями в асинхронном режиме. Это что‑то вроде AutoCloseable в синхронных приложениях.
Класс com.magnit.flux.hibernate.dao.HibernateFluxResultProducer
@RequiredArgsConstructor
public class HibernateFluxResultProducer<T> {
private final EntityManagerFactory entityManagerFactory;
private StatelessSession statelessSession;
/**
* Функция выполняет hql запрос и создаёт Flux из Result Stream
*/
public Flux<T> execute(String qlString, Class<T> resultClass) {
Optional.ofNullable(statelessSession).ifPresent(e -> {
throw new RuntimeException("Cursor already open");
});
val sessionFactory = entityManagerFactory.unwrap(SessionFactory.class);
//Использование Stateless Session позволяет ускорить чтение за счёт использования detached объектов
statelessSession = sessionFactory.openStatelessSession();
statelessSession.getTransaction().begin();
val query = statelessSession.createQuery(qlString, resultClass);
//Используется возможность hibernate для потокового чтения из БД - getResultStream
return Flux.fromStream(query.getResultStream());
}
public Mono<Void> commit() {
return close(true);
}
public Mono<Void> rollback() {
return close(false);
}
public Mono<Void> close(boolean commitTran) {
if (statelessSession.getTransaction().isActive()) {
if (commitTran) {
statelessSession.getTransaction().commit();
} else {
statelessSession.getTransaction().rollback();
}
}
statelessSession.close();
return Mono.empty();
}
}
Данный класс стартует транзакцию и выполняет hql запрос. Также он имеет методы commit и rollback.
Контроллер, работающий с hibernate
Основной функционал работы с потоками реализован в приватном методе getOperationFlux. Генерация потока осуществляется вызовом метода Flux.usingWhen, который можно назвать асинхронным AutoCloseable. Контроллер предоставляет два эндпоинта:
getOperationsStream — json стриминг
getOperationsWs — веб-сокет
Класс com.magnit.flux.hibernate.controller.HibernateOperationController
public class HibernateOperationController {
private final ObjectFactory<HibernateFluxResultProducer<Operation>> streamResultProducerObjectFactory;
@GetMapping(path = "/operations-stream", produces = "application/stream+json")
public Flux<Operation> getOperationsStream() {
return getOperationFlux();
}
@MessageMapping("operations")
public Flux<Operation> getOperationsWs() {
return getOperationFlux();
}
private Flux<Operation> getOperationFlux() {
Mono<HibernateFluxResultProducer<Operation>> streamResultExecutorMono = Mono
.just(streamResultProducerObjectFactory.getObject());
return Flux.usingWhen(streamResultExecutorMono,
se -> se.execute("select o from Operation o JOIN FETCH o.customer c", Operation.class),
HibernateFluxResultProducer::commit);
}
}
Результат работы первого метода можно посмотреть непосредственно в браузере:
Можно останавить загрузки или даже закрыть браузер. Можно убедиться, что метод close всегда вызывается.
Второй метод — более интерсный. На его основе можно реализовать обмен по протоколу Rsocket. Для тестирования Rsocket клиент реализован на javascript. Также данный пример демонстрирует использование механизма backpressure (обратное давление).
statis/index.js
client.connect().subscribe({
onError: error => console.error(error),
onSubscribe: cancel => {
},
onComplete: socket => {
socket.requestStream({
data: null,
metadata: String.fromCharCode("operations".length) + "operations"
})
.subscribe({
onComplete: () => console.log("requestStream done"),
onError: error => {
console.log("got error with requestStream");
console.error(error);
},
onNext: value => {
document.getElementById(
"operation").innerText = "Operation ID: "
+ value.data.id + ". Customer: " + value.data.customer.name;
//Управление обратным давлением вручную. Можно поставить на паузу (функция pause), можно возобновить чтение (функция suspend)
if (!paused) {
subscription.request(1);
}
},
onSubscribe: sub => {
subscription = sub;
subscription.request(1);
}
});
}
});
Результаты работы также можно посмотреть в браузере.
Кнопки Pause и Suspend позволяют остановить и возобновить загрузку потока.
Отношения один ко многим и многие ко многим (OneToMany &ManyToMany)
Теперь рассмотрим достаточно частую ситуацию, когда со стороны заказчика выдвигаются новые требования. В данном случае упрощенный пример: заказчик выдвинул новые требования к обмену: теперь нужно выгружать товары, содержащиеся в операциях. Теоретически это можно сделать и с hibernate, но возникают проблемы с производительностью. Как альтернативу мы рассмотрели MyBatis. Ниже сравниваем MyBatis и hiberabte.
MyBatis как более быстрая альтернатива hibernate
MyBatis | Hibernate |
Простая разработка, так как включает в основном написание SQL запросов | Разработка более сложная, так как hibernate более громоздкий и сложный для понимания |
MyBatis использует SQL, который зависит от БД | Hibernate в общем случае не зависит от БД |
Очень просто использовать хранимые процедуры, особенно с курсорами в качестве out параметров | В некоторых случаях хранимые процедуры использовать непросто |
Так как SQL зависит от БД, базу данных сменить очень сложно | Hibernate позволяет легко изменить БД |
Механизмы кэширования в Mybatis не развиты | Hibernate имеет очень хорошие механизмы кэширования |
MyBatis позволяет обрабатывать отношения «один ко многим» и «многие ко многим» в стримах | Hibernate не позволяет обрабатывать отношения один ко многим и многие ко многим в стримах |
В нашем случае важна способность MyBatis обрабатывать отношения один ко многим и многие ко многим в стримах.
Стриминг данных в MyBatis
В MyBatis 3.4.0 добавлен тип курсор — org.apache.ibatis.cursor.Cursor, аналогичный классу Resultset в JDBC
Курсор реализует интерфейс итератора, поэтому использовать его очень просто
Курсор позволяет использовать коллекции в результатах, поэтому можно реализовать отображение отношений one-to-many и many-to-many
Изменения в сущностях
Product — справочник продукции
Operation — добавилась связь @ManyToMany со справочником продукции
@Entity
@Table(name = "PRODUCT")
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Product {
@Id
@GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "sequence")
@SequenceGenerator(name = "sequence", sequenceName = "SQ_OPERATION_DETAIL", allocationSize = 1)
private Long id;
@Column(name = "NAME")
private String name;
}
@ManyToMany
@JoinTable(
name = "operation_detail",
joinColumns = @JoinColumn(name = "operation_id"),
inverseJoinColumns = @JoinColumn(name = "product_id"))
private List<Product> products;
Маппинг сущностей
Основная тонкость работы с MyBatis заключается в маппинге .
<mapper namespace="com.magnit.flux.mybatis.mapper.OperationMapper">
<select id="getAllOperations" resultMap="operationResult" resultOrdered="true">
select d.operation_id id, d.product_id, p.name product_name from operation_detail d inner
join product p on p.id = d.product_id order by d.operation_id
</select>
<resultMap id="operationResult" type="com.magnit.flux.model.entity.Operation">
<id property="id" column="id"/>
<collection property="products" ofType="com.magnit.flux.model.entity.Product">
<id property="id" column="product_id"/>
<result property="name" column="product_name"/>
</collection>
</resultMap>
</mapper>
Здесь getAllOperations определяет отсортированный набор данных (resultOrdered=”true”), а MyBatis автоматом группирует данные по operation.id и возвращает операции с вложенными коллекциями продукции.
Использование MyBatis для стриминга
Класс аналогичный HibernateFluxResultProducer. Он имеет пару отличий:
В качестве параметра передаётся не hql запрос, а лямбда-выражение, возвращающее курсор
В данном подходе не используются долгоиграющие транзакции, что даёт небольшое преимущество перед hibernate. По крайней мере DBA будут спокойнее).
Класс com.magnit.flux.mybatis.dao.MyBatisFluxResultProducer
public class MyBatisFluxResultProducer<T> {
private final SqlSessionFactory sqlSessionFactory;
private Cursor<T> cursor;
private SqlSession sqlSession;
/**
* @param cursorFunction функция для продюсирования курсора
* @return Flux соответсвующего типа
*/
public Flux<T> execute(Function<SqlSession, Cursor<T>> cursorFunction) {
Optional.ofNullable(sqlSession).ifPresent(e -> {
throw new RuntimeException("Cursor already open");
});
sqlSession = sqlSessionFactory.openSession();
cursor = cursorFunction.apply(sqlSession);
return Flux.fromStream(StreamSupport.stream(cursor.spliterator(), false));
}
@SneakyThrows
public Mono<Void> close() {
if (cursor.isOpen()) {
cursor.close();
}
sqlSession.close();
return Mono.empty();
}
}
Контроллер, работающий с MyBatis
Контроллер выглядит практически аналогично контроллеру, работающему с hibrernate.
Класс com.magnit.flux.mybatis.controller.MyBatisOperationController
public class MyBatisOperationController {
private final SqlSessionFactory sqlSessionFactory;
private final ObjectFactory<MyBatisFluxResultProducer<Operation>> streamResultProducerObjectFactory;
@GetMapping(path = "/operations-stream", produces = "application/stream+json")
public Flux<Operation> getOperationsStream() {
return getOperationFlux();
}
private Flux<Operation> getOperationFlux() {
Function<SqlSession, Cursor<Operation>> cursorFunction = sqlSession -> sqlSession
.getMapper(OperationMapper.class).getAllOperations();
Mono<MyBatisFluxResultProducer<Operation>> streamResultExecutorMono = Mono
.just(streamResultProducerObjectFactory.getObject());
return Flux.usingWhen(streamResultExecutorMono,
se -> se.execute(cursorFunction),
MyBatisFluxResultProducer::close);
}
}
Результат также можно посмотреть в браузере.
Проблема приёма данных на стороне БД СМ
Проблема с передачей больших объёмов данных была решена, но возникла проблема перегрузки БД запросами. Необходимо было ограничить количество запросов на запись в БД. На помощь приходит backpressure (обратное давление).
Использование backet4j для реализации backpressure
Bucket4j — это библиотека ограничения скорости, основанная на алгоритме token-bucket
Bucket4j — это потокобезопасная библиотека, которую можно использовать как в автономном приложении JVM, так и в кластерной среде
Bucket4j поддерживает кэширование в памяти или распределенное кэширование с помощью спецификации JCache (JSR107)
Алгоритм Token Bucket
Создаётся корзина, вместимость которого определяется количеством токенов, которое она может вместить
Всякий раз, когда потребитель хочет получить доступ к конечной точке API, он должен получить токен из корзины
Токен из корзины удаляется, если он доступен, и запрос принимается
Если в корзине нет токенов, то запрос отклоняется
Токены пополняются с фиксированной скоростью, которая и является ограничением
Подробнее о bucket4j здесь.
Bucket4j и Flux
В данной статье представлены два варианта реализации связки Bucket4j и Flux
Блокирующая с использованием методов Flux.delayUntil и Bucket.asBlocking
Асинхронная, использующая механизм управления подписчиком (Subscription) и Bucket.asScheduler
Реактивный клиента на java с использованием Bucket4j
В классе RateLimitService реализованы два соответствующих метода, которые с заданной скоростью фетчат записи из реактивного http-сервиса, который описан выше.
Использование блокирующего API bucket4j для backpressure
Класс com.magnit.fluex.client.service.RateLimitService.executeBlocking
public void executeBlocking(long rate) {
AtomicLong counter = new AtomicLong(0);
val limit = Bandwidth.simple(rate, Duration.ofMinutes(1));
val bucket = Bucket.builder().addLimit(limit).build();
val flux = client.get().retrieve().bodyToFlux(Operation.class);
flux.delayUntil(operation -> {
try {
bucket.asBlocking().consume(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Mono.just("two");
}).subscribe(value -> System.out.println(counter.incrementAndGet() + " " + value));
}
Основу логики составляет метод Flux.delayUntil в связке bucket.asBlocking.
При вызове bucket.asBlocking токен либо возвращается, либо поток блокируется до пополнения корзины.
Использование шедулинга bucket4j для backpressure
Класс com.magnit.flux.client.service.RateLimitService.executeScheduling
public void executeScheduling(long rate) {
AtomicLong counter = new AtomicLong(0);
val service = Executors.newSingleThreadScheduledExecutor();
val limit = Bandwidth.simple(rate, Duration.ofMinutes(1));
val bucket = Bucket.builder().addLimit(limit).build();
final long watTime = 50;
BaseSubscriber<Operation> subscriber = new BaseSubscriber<Operation>() {
private void handleFutureResult(Boolean result) {
if (result) {
this.request(1);
} else {
service.schedule(() -> asyncRequest(), watTime, TimeUnit.NANOSECONDS);
}
}
@SneakyThrows
private void asyncRequest() {
val future = bucket.asScheduler().tryConsume(50, watTime, service);
if (future.isDone()) {
handleFutureResult(future.get());
} else {
future.thenAcceptAsync(value -> {
handleFutureResult(value);
});
}
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
asyncRequest();
}
@Override
protected void hookOnNext(Operation value) {
System.out.println(counter.incrementAndGet() + " " + value);
asyncRequest();
}
};
val flux = client.get().retrieve().bodyToFlux(Operation.class);
flux.subscribe(subscriber);
}
В данном методе основной поток не блокируется. Создаётся подписчик, наследованный от класса из библиотеки reactor — BaseSubscriber. Токен получается методом bucket.asScheduler, который возвращает объект CompletableFuture. Если токен удалось получить (CompletableFuture .isDone() == true), тогда фетчится очередная запись, иначе фетч запускается после пополнения корзины. Ожидание пополнения происходит асинхронно — метод CompletableFuture.thenAcceptAsync.
Выводы
В сервисе нам удалось совместить блокирующую и реактивную архитектуры
В выборе между hibernate и mybatis мы остановились на hibernate, так как, во-первых, приложение было уже написано с использованием hibernate, а во-вторых, нам важно кеширование данных.
Помимо реактивности, мы получили возможность использовать в приложении весь богатый функционал flux.