Size: a a a

ReactiveX - русскоговорящее сообщество

2021 January 13

A

Aleksandr in ReactiveX - русскоговорящее сообщество
Товарищи, а подскажите ка пожалуйста, есть ли весомая разница между BaseSubscriberом и лямбдой, которую мы пихаем в subscribe метод? Я имею в виду с точки зрения создания подписок и тд. Как я вижу, в первом случае мы сами вольны контроллировать саму подписку через cancel/request. И это выходит вся разница?

И вообще зачем люди делают request(1)?
источник

VA

Valentin Avdeev in ReactiveX - русскоговорящее сообщество
всем привет! разбираюсь с rxjava, кейс такой, есть база realm где в каждом объекте есть стринг поле которое содержит предложение, хочу сделать в бекграунд потоке итерацию по объектам реалма и собрать все стринги в один текст. Подскажите как реализовать такое
источник
2021 January 16

A

Aleksandr in ReactiveX - русскоговорящее сообщество
Гайз, а кто знает почему Flux#interval выполняется на шедулере по дефолту?

var generator =  Flux.interval(Duration.ofSeconds(1));
generator.subscribe(i -> System.
out.println("[first subscriber] - " + Thread.currentThread().getName() + ", value - " + i));
generator.subscribe(i -> System.
out.println("[second subscriber] - " + Thread.currentThread().getName() + ", value - " + i));

Ну и я короче вижу следующее
[first subscriber] - parallel-1, value - 0
[first subscriber] - parallel-1, value - 1
[first subscriber] - parallel-1, value - 2
[first subscriber] - parallel-1, value - 3
[second subscriber] - parallel-2, value - 0
[first subscriber] - parallel-1, value - 4
[second subscriber] - parallel-2, value - 1
[first subscriber] - parallel-1, value - 5
[second subscriber] - parallel-2, value - 2


Как-то очень странно, в доках не вижу никаких намёков на это 🤔
источник

AZ

Alexey Zavyalov in ReactiveX - русскоговорящее сообщество
Aleksandr
Гайз, а кто знает почему Flux#interval выполняется на шедулере по дефолту?

var generator =  Flux.interval(Duration.ofSeconds(1));
generator.subscribe(i -> System.
out.println("[first subscriber] - " + Thread.currentThread().getName() + ", value - " + i));
generator.subscribe(i -> System.
out.println("[second subscriber] - " + Thread.currentThread().getName() + ", value - " + i));

Ну и я короче вижу следующее
[first subscriber] - parallel-1, value - 0
[first subscriber] - parallel-1, value - 1
[first subscriber] - parallel-1, value - 2
[first subscriber] - parallel-1, value - 3
[second subscriber] - parallel-2, value - 0
[first subscriber] - parallel-1, value - 4
[second subscriber] - parallel-2, value - 1
[first subscriber] - parallel-1, value - 5
[second subscriber] - parallel-2, value - 2


Как-то очень странно, в доках не вижу никаких намёков на это 🤔
Вот, вроде, в доке об этом и пишут:

Some operators use a specific scheduler from Schedulers by default (and usually give you the option of providing a different one). For instance, calling the Flux.interval(Duration.ofMillis(300)) factory method produces a Flux<Long> that ticks every 300ms. By default, this is enabled by Schedulers.parallel().
источник

A

Aleksandr in ReactiveX - русскоговорящее сообщество
Alexey Zavyalov
Вот, вроде, в доке об этом и пишут:

Some operators use a specific scheduler from Schedulers by default (and usually give you the option of providing a different one). For instance, calling the Flux.interval(Duration.ofMillis(300)) factory method produces a Flux<Long> that ticks every 300ms. By default, this is enabled by Schedulers.parallel().
Мой косяк, я не ту доку прочитал
источник
2021 January 17

A

Aleksandr in ReactiveX - русскоговорящее сообщество
Ещё один вопросик вдогонку. Вот смотрите, допустим у меня есть горячий паблишер:
Flux<Long> hot = Flux.interval(Duration.ofSeconds(1));
hot.share(); // либо publish()


И у меня есть два подписчика, которые создают две подписки на него. В таком случае обработка может чередоваться между этими сабскрайберами. Но по каким именно павилам? Как определить когда один паблишер переключается на второго?
источник

l

lenar in ReactiveX - русскоговорящее сообщество
Aleksandr
Ещё один вопросик вдогонку. Вот смотрите, допустим у меня есть горячий паблишер:
Flux<Long> hot = Flux.interval(Duration.ofSeconds(1));
hot.share(); // либо publish()


И у меня есть два подписчика, которые создают две подписки на него. В таком случае обработка может чередоваться между этими сабскрайберами. Но по каким именно павилам? Как определить когда один паблишер переключается на второго?
нет там никакого чередования. оба подписчика получат значения. и share/publish это не горячий паблишер
источник

A

Aleksandr in ReactiveX - русскоговорящее сообщество
lenar
нет там никакого чередования. оба подписчика получат значения. и share/publish это не горячий паблишер
Почему это не горячий? Как тогда можно объяснить доку “This will effectively turn any type of sequence into a hot sequence”? Плюс речь тут идёт про то, что на этом паблишере делается connect. В таком случае данные будут поступать вот так
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");

Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");


Ну и вывод
clock1 1s
clock1 2s
clock1 3s
   clock2 1s
clock1 4s
   clock2 2s
clock1 5s
   clock2 3s
clock1 6s
   clock2 4s
источник

l

lenar in ReactiveX - русскоговорящее сообщество
connect делает publish горячим, но сам publish ещё не горячий
источник

A

Aleksandr in ReactiveX - русскоговорящее сообщество
Тут речь скорее шла про то, что паблишер уже горячий, с которым я работаю в примере. А publish/share - это просто пример перехода. Ну и главный вопрос отсаётся актуальным - это то, как именно происходит чередование подписчиков на паблишер, как именно они поглощают значения
источник

l

lenar in ReactiveX - русскоговорящее сообщество
Aleksandr
Тут речь скорее шла про то, что паблишер уже горячий, с которым я работаю в примере. А publish/share - это просто пример перехода. Ну и главный вопрос отсаётся актуальным - это то, как именно происходит чередование подписчиков на паблишер, как именно они поглощают значения
повторюсь: оба получат одни и те же значения, там нет чередования значений. а порядок (кому первым придет) я не знаю, возможно в порядке подписки. а может и нет
источник

A

Aleksandr in ReactiveX - русскоговорящее сообщество
lenar
повторюсь: оба получат одни и те же значения, там нет чередования значений. а порядок (кому первым придет) я не знаю, возможно в порядке подписки. а может и нет
>Оба получат одни и те же значения
Имеете в виду то, что будут получатся абсолютно одинаковые значения что для первого, что для второго сабскрайбера?
источник

l

lenar in ReactiveX - русскоговорящее сообщество
Aleksandr
>Оба получат одни и те же значения
Имеете в виду то, что будут получатся абсолютно одинаковые значения что для первого, что для второго сабскрайбера?
да
источник

A

Aleksandr in ReactiveX - русскоговорящее сообщество
Мне кажется что либо вы меня не так поняли, либо я вас
источник

A

Aleksandr in ReactiveX - русскоговорящее сообщество
Потому что вот пример

var hot = Flux
       .
fromIterable(List.of(1, 0, 2, 4, -12, 123))
       .publish();

hot
       .doOnNext(value -> System.
out.println("[subscriber-1] [thread]: " + Thread.currentThread().getName() + ", [value]: " + value))
       .subscribe();

hot.connect();

Thread.
sleep(3000);

hot
       .doOnNext(value -> System.
out.println("[subscriber-2] [thread]: " + Thread.currentThread().getName() + ", [value]: " + value))
       .subscribe();


Получаем:
[subscriber-1] [thread]: main, [value]: 1
[subscriber-1] [thread]: main, [value]: 0
[subscriber-1] [thread]: main, [value]: 2
[subscriber-1] [thread]: main, [value]: 4
[subscriber-1] [thread]: main, [value]: -12
[subscriber-1] [thread]: main, [value]: 123

И вот холодный паблишер
var cold = Flux
       .
fromIterable(List.of(1, 0, 2, 4, -12, 123));

cold
       .doOnNext(value -> System.
out.println("[subscriber-1] [thread]: " + Thread.currentThread().getName() + ", [value]: " + value))
       .subscribe();

Thread.
sleep(3000);

cold
       .doOnNext(value -> System.
out.println("[subscriber-2] [thread]: " + Thread.currentThread().getName() + ", [value]: " + value))
       .subscribe();


И вывод
[subscriber-1] [thread]: main, [value]: 1
[subscriber-1] [thread]: main, [value]: 0
[subscriber-1] [thread]: main, [value]: 2
[subscriber-1] [thread]: main, [value]: 4
[subscriber-1] [thread]: main, [value]: -12
[subscriber-1] [thread]: main, [value]: 123
[subscriber-2] [thread]: main, [value]: 1
[subscriber-2] [thread]: main, [value]: 0
[subscriber-2] [thread]: main, [value]: 2
[subscriber-2] [thread]: main, [value]: 4
[subscriber-2] [thread]: main, [value]: -12
[subscriber-2] [thread]: main, [value]: 123
источник

l

lenar in ReactiveX - русскоговорящее сообщество
как бы это объяснить 😅 тот что hot (просто publish) он псевдо-горячий - пока не сделать connect, он не начнёт эмиттить. и все кто до connect'a подпишутся получат всё, а те кто после connect'a могут что-то получить, а могут ничего не получить, например hot все раньше заэмитит
источник

l

lenar in ReactiveX - русскоговорящее сообщество
в этом примере hot сразу заэмитит все элементы сразу после connect'a. затем задержка 3 сек и вторая подписка. но hot уже давно закончился
источник

A

Aleksandr in ReactiveX - русскоговорящее сообщество
Но допустим если мы сделаем hot до любых подписок, то у нас вообще ничего не заэмитится, это нормально? 😁
источник

l

lenar in ReactiveX - русскоговорящее сообщество
Aleksandr
Но допустим если мы сделаем hot до любых подписок, то у нас вообще ничего не заэмитится, это нормально? 😁
что значит сделаем hot до любых подписок? наверное речь про connect?
источник

l

lenar in ReactiveX - русскоговорящее сообщество
если про него, то да, в данном примере все подписчики после connect ничего не получат, так как hot.connect скорее всего отработает до подписок
источник