Size: a a a

ReactiveX - русскоговорящее сообщество

2020 October 18

P

Papin in ReactiveX - русскоговорящее сообщество
Victor
Здравствуйте, подскажите как можно собрать все exceptions с observable?
То есть нужно чтобы источник не прекращал выдавать данные даже после эксепшна. Он должен продолжить дальше выдавать данные   и при этом мы смогли эти эксепшины потом получить как список

Интересует как это сделать с помощью rx, если так можно конечно
Самый верный варик как по мне это через subject реализовать
Если верно помню он не умрет когда эрор отправляет

Но по-любому можно на observable чёт сделать
источник

P

Papin in ReactiveX - русскоговорящее сообщество
Как минимум онЭрорРезюмНекст и там собирать ошибки и обрабатывать или дуОнЭрор скорее
источник

AA

Aleksei Afanasev in ReactiveX - русскоговорящее сообщество
Victor
Здравствуйте, подскажите как можно собрать все exceptions с observable?
То есть нужно чтобы источник не прекращал выдавать данные даже после эксепшна. Он должен продолжить дальше выдавать данные   и при этом мы смогли эти эксепшины потом получить как список

Интересует как это сделать с помощью rx, если так можно конечно
Есть штука RxRelay - это те же сабджекты, но без вызова onComplete/onError
источник
2020 October 20

v

vitaly in ReactiveX - русскоговорящее сообщество
Вот этот конкатМап не ждёт результата следующего айтема из репозитория. Как сделать правильно?
источник

P

Papin in ReactiveX - русскоговорящее сообщество
vitaly
Вот этот конкатМап не ждёт результата следующего айтема из репозитория. Как сделать правильно?
переписать
источник

P

Papin in ReactiveX - русскоговорящее сообщество
подписка внутри - это досвидание
источник

v

vitaly in ReactiveX - русскоговорящее сообщество
ну, я вроде как это понимаю, но хз как сделать по-человечески, чтобы оно возвращало результат и зиповало его с входным параметром
источник
2020 October 22

НП

Непредставимый Пхы... in ReactiveX - русскоговорящее сообщество
Добрый день!
Если мне надо поменять паблишер, но при этом не потерять сигналы ошибки от предыдущего, нормально ли делать
.flatMap(previousResult -> newMono) или для этого есть лучше подходы?

Сначала написал по глупости через .then(newMono) и долго гадал, куда мои сигналы ошибок теряются
источник

A

Aleksandr in ReactiveX - русскоговорящее сообщество
Непредставимый Пхы
Добрый день!
Если мне надо поменять паблишер, но при этом не потерять сигналы ошибки от предыдущего, нормально ли делать
.flatMap(previousResult -> newMono) или для этого есть лучше подходы?

Сначала написал по глупости через .then(newMono) и долго гадал, куда мои сигналы ошибок теряются
Не уверен до конца, но пробовали ли publishOn?
источник

НП

Непредставимый Пхы... in ReactiveX - русскоговорящее сообщество
Немного контекста:
я если что говорю о вебфлюксе, но механизм должен быть общим, если он есть.

У меня есть Mono с с WebSession внутри
Вначале у меня цепочка flatMap'ов
А затем в зависимости от результата сигналов мне надо подчистить или наоборот записать поля в сессию
источник
2020 October 24

A

Aleksandr in ReactiveX - русскоговорящее сообщество
Непредставимый Пхы
Немного контекста:
я если что говорю о вебфлюксе, но механизм должен быть общим, если он есть.

У меня есть Mono с с WebSession внутри
Вначале у меня цепочка flatMap'ов
А затем в зависимости от результата сигналов мне надо подчистить или наоборот записать поля в сессию
А вы не используете контекст для этих целей? Что мешает передавать ошибки через него?
источник

НП

Непредставимый Пхы... in ReactiveX - русскоговорящее сообщество
Да, я не так давно узнал об этой функциональности. Но пока не разобрался окончательно. Но кажется, что это действительно решит задачу
источник
2020 October 26

AD

Alex D in ReactiveX - русскоговорящее сообщество
Коллеги, добрый день.
Использую Spring WebFlux и Кафка Реактор.
Задача обработать синхронный POST запрос, положить тело запроса в один топик Кафки.
Дождаться ответа в другом топике кафки. И вернуть этот ответ на Post запрос.

public class RHandler {

   private static final String TOPIC = "main";
   private final KafkaSender kafkaSender;
   private final KafkaReceiver<Integer, String> kafkaReceiver;
   private final ConnectableFlux<String> publish;

   public RHandler(KafkaSender kafkaSender, KafkaReceiver<Integer, String> kafkaReceiver) {
       this.kafkaSender = kafkaSender;
       this.kafkaReceiver = kafkaReceiver;
       publish = kafkaReceiver.receive().map(e -> e.value()).publish();
       this.publish.connect();
       this.publish.subscribe(s -> System.out.println("Kafka responce:" + s));
   }

   public Mono<ServerResponse> test(ServerRequest request) {

       Flux<String> fl = this.publish.refCount();

       return kafkaSender.<Integer>send(request.bodyToFlux(String.class)
               .map(s -> SenderRecord.create(new ProducerRecord<>(TOPIC, 1, s), s)))
               .map(e -> ((SenderResult) e).correlationMetadata())
               .map(s -> {return fl.filter(strPair -> s.equals(strPair)); })
               .flatMap(r -> r)
               .flatMap(r -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(BodyInserters.fromValue(r)))
               .single();
   }
}  
 
Тело запроса успешно записывается в Кафку.
Слушатель Кафки успешно  принимает ответ из Кафки.
Но POST запрос зависает на ожидании ответа из Кафки.
Вот на этой строчке:
   .map(e -> stringFlux.single().flux())
Пример упростил, сейчас ожидаю вообще любого ответа в Кафке.
И для тестов использую один топик Кафки, что положил то и читаю из одного топика Кафки.  
В чем я не прав? И как мне корректно дождаться ответа в другом топики кафки в одном обработчике.
источник
2020 October 27

v

vitaly in ReactiveX - русскоговорящее сообщество
Ап. Помогите, пожалуйста, я вообще не допираю :)
источник

v

vitaly in ReactiveX - русскоговорящее сообщество
Переслано от vitaly
Вот этот конкатМап не ждёт результата следующего айтема из репозитория. Как сделать правильно?
источник

v

vitaly in ReactiveX - русскоговорящее сообщество
Мне нужно получить оба айтема, то есть зип какбы подходит, но если вот так внутри зазиповать, то оно потом идёт набекрень ожидаемо.
источник

l

lenar in ReactiveX - русскоговорящее сообщество
vitaly
Переслано от vitaly
Вот этот конкатМап не ждёт результата следующего айтема из репозитория. Как сделать правильно?
из репы возвращается Observable или Single? если первое, то zip c just-ом вернёт обс с одним элементом
источник

v

vitaly in ReactiveX - русскоговорящее сообщество
возвращается обзервабл. Оно-то возвращает, но мне надо чтобы вот этот запрос ждал окончания внешнего пайплайна, а он не ждёт
источник

l

lenar in ReactiveX - русскоговорящее сообщество
vitaly
возвращается обзервабл. Оно-то возвращает, но мне надо чтобы вот этот запрос ждал окончания внешнего пайплайна, а он не ждёт
внешний пайплан это запрос в репу?
источник

v

vitaly in ReactiveX - русскоговорящее сообщество
lenar
внешний пайплан это запрос в репу?
не, я имею в виду то, что там дальше происходит, после конката последнего. Там ещё всякие запросы делаются, и вот есть необходимость, чтобы то, что внутри, подождало его завершения, и я не могу понять, как этого добиться. В тесте когда пытаюсь воспроизвести - работает как надо, а здесь всё набекрень идёт...
источник