Если хотите бесконечный поток данных (push based), то я не вижу другого выхода. Кто-то ведь должен вызывать emitter.next у FluxSink. С точки зрения реактора можно воспользоваться шедулером.
Вариант проще — pull based подход через fluxSink.onRequest. Тут и нить выполнения можно изменить через subscribeOn. Правда, в этом случае сложно организовать «бесконечную» работу.
Более элегантного варианта я не знаю, благо есть гиттер, где в т.ч. отвечают оперативней.
Не до конца понял вопрос, но попробую ответить. Какого-то универсального рецепта нет, все упирается в данные. Если метод возвращает весь необходимый список объектов единовременно (не потоком) — вероятно это не Flux, а Mono. Hot Stream это скорее про какие-нибудь Event Listener на UI или системные ивенты, вебсокеты. Что-то, вызов чего мы не контролируем.
Оба варианта: publishOn() и subscribeOn() позволяют изменять поток выполнения. В случае с Schedulers.elastic(), можно сказать что он похож на стандартный ExecutorService newCachedThreadPool().
Перед RxJava 1? Очевидно есть. Перед RxJava 2? Отсутствие поддержки Java 6 + Android 2.3 + наследия RxJava 1, в связи с чем более "чистая" семантика. Лучшая поддержка фишек Java 8. Spring Boot 2, по умолчанию, предлагает использовать Reactor для реактивности, хотя это же Spring, поэтому можно подключить альтернативное решение.
По поводу Flux.range. Там нет ошибки, просто так устроен оператор:
/**
* @param start the first integer to be emit
* @param count the total number of incrementing values to emit, including the first value
* @return a ranged {@link Flux}
*/
public static Flux<Integer> range(int start, int count) {}
Он выводит значения начиная со start и по count-1.
Это из-за особенности устройства Flux (и работы оператора flatMap()). Flux.empty() будет передан дальше по цепочке вызовов, а потом подхватится оператором switchIfEmpty(). Вместо тысячи слов:
Вариант проще — pull based подход через fluxSink.onRequest. Тут и нить выполнения можно изменить через subscribeOn. Правда, в этом случае сложно организовать «бесконечную» работу.
Более элегантного варианта я не знаю, благо есть гиттер, где в т.ч. отвечают оперативней.
Да, в плане работы с JDBC — все очень плохо.
Но в случае чего, для работы с блокирующим источником, документация предлагает следующую конструкцию:
У нас в одном из сервисов есть вот такой метод:
Оба варианта: publishOn() и subscribeOn() позволяют изменять поток выполнения. В случае с Schedulers.elastic(), можно сказать что он похож на стандартный ExecutorService newCachedThreadPool().
Перед RxJava 1? Очевидно есть. Перед RxJava 2? Отсутствие поддержки Java 6 + Android 2.3 + наследия RxJava 1, в связи с чем более "чистая" семантика. Лучшая поддержка фишек Java 8. Spring Boot 2, по умолчанию, предлагает использовать Reactor для реактивности, хотя это же Spring, поэтому можно подключить альтернативное решение.
Еще есть вот такой забавный твит, от "project lead of RxJava".
По поводу Flux.range. Там нет ошибки, просто так устроен оператор:
Он выводит значения начиная со start и по count-1.
Это из-за особенности устройства Flux (и работы оператора flatMap()). Flux.empty() будет передан дальше по цепочке вызовов, а потом подхватится оператором switchIfEmpty(). Вместо тысячи слов:
Выведет грустный смайлик