Pull to refresh

Comments 12

'В первый раз, когда использовали мою функцию, она возвращала значение асинхронно, а во второй раз — синхронно'


А в чем, собственно, проблема тут? Вызывающий код в любом случае получает подписку, ему всё равно, когда код там выполняется по факту

Я немного промазал) Ответил ниже.
Если у нас только подписка, то, наверное, ничего страшного. Но иногда мы можем рассчитывать на определенный контекст выполнения кода по подписке.
Попробую пояснить на примере, возможно, не очень реалистичном, но главное суть.

...  
getUser() {
    get(1).subscribe(user => this.user = user);
    this.user = null;
  }
...


У нас есть функция getUser() получения конкретного юзера. get() это функция из статьи, которая сначала возвращает асинхронно данные, а на второй раз синхронно.
И мы подумали, что на время получения юзера, нужно зачем-то обнулить его данные, может быть там какая логика связанная с этим, решив что get() всегда выполняется в контексте макрозадач.
В первый раз, при вызове getUser() все будет нормально. Во второй раз, сначала выполнится код в subscribe, а потом присвоение null. И это явно не то, чего мы хотели.

У меня вопрос немного не в тему, почему в rxjs нет выбора backpressure стратегии, как в джаве, например? Я даже находил упоминание о ней в документации. Как работать в таком случае, если у меня есть, допустим, бесконечный источник данных и потери данных не допустимы?

В вашем случае должны помочь loss-less операторы — bufferCount, windowCount, bufferTime и т.д.
Я не знаком с тем, как выбирается backpressure стратегия в Java, но в RxJS этим можно управлять через операторы.
Вы можете подробнее об этом прочитать в старой документации.
Строго говоря, bufferCount, windowCount и bufferTime не относятся к backpressure. Backpressure это механизм (говорю своими словами), который учитывает скорость обработки подписчиков. Например,
interval(1).subscribe(...)
взорвется, потому что скорость выдачи результатов почти всегда будет быстрее скорости их обработки. В rxjava же у Observable есть метод
range(int start, int count)
, который учитывает скорость работы подписчика и не дает обвалиться программе.
Что вы имеете ввиду под «взорвётся»? Этот пример с интервалом же практически ничем не отличается от нативного
setInterval(handler, 1)

который непонятно как может взорваться
это значит, что если события эмитируются быстрее, чем их обрабатывают, они будут скапливаться в памяти до тех пор, пока она не закончится.
О, я понял. Почитал немного про RxJava. Там оказывается есть отдельная сущность Flowable для backpressure. Круто!

Нашел интересную дисскусию о backpressure story в RxJS, открытую еще в 2015 году. Вроде как по ходу дисскусии, мейнтейнеры библиотеки решили что имплементировать слишком дорого, а use кейсов не прям много.

Никропостер врывается в комментарии. А в чем разница между контекстом subscribe и контекстом принимаемым им калбеков next, error и complete? Есть какие-то кейсы, чтобы хотя бы проиллюстрировать разницу?

Разница наверное, как в 3ем коменте

Сори за долгий ответ. OlegTar ответил верно ниже. Попробую немного подробнее ответить. Если у нас есть observeOn в пайпе, то он как бы свитчит поток в scheduler в моменте, где он добавлен. А subscribeOn оборачивает весь Observable в выбранный scheduler.

.pipe(
  ... // синхронной работы остается синхронной
  observeOn(asyncScheduler)
  ... // все остальное будет выполняться ассинхронно 
)

.pipe(
  ... // синхронная работа будет выполнена ассинхронно
  subscribeOn(asyncScheduler)
  ... // все остальное будет выполняться ассинхронно 
)

Я создал StackBlitz, где можно наглядно посмотреть, как это работает

Sign up to leave a comment.

Articles