Size: a a a

ReactiveX - русскоговорящее сообщество

2020 October 27

v

vitaly in ReactiveX - русскоговорящее сообщество
чёт дело походу не во вложенном зипВиз. Я поменял, и всё равно то, что до этого оператора выполняется не дожидаясь окончания цепочки, как будто я flatMap юзаю, а не конкат
источник

v

vitaly in ReactiveX - русскоговорящее сообщество
как так может быть, что вот эта вот часть не ждёт, пока завершится вся цепочка, а сразу обрабатывает следующий айтем из итерабла?
источник

AG

Artem Gilmudinov in ReactiveX - русскоговорящее сообщество
vitaly
как так может быть, что вот эта вот часть не ждёт, пока завершится вся цепочка, а сразу обрабатывает следующий айтем из итерабла?
вангую, что если сделать doOnSubscribe на поток на 5 строчке, то лог в doOnSubscribe будет вызываться последовательно.
источник

v

vitaly in ReactiveX - русскоговорящее сообщество
Artem Gilmudinov
вангую, что если сделать doOnSubscribe на поток на 5 строчке, то лог в doOnSubscribe будет вызываться последовательно.
Что это значит?
источник

AG

Artem Gilmudinov in ReactiveX - русскоговорящее сообщество
vitaly
Что это значит?
я бы проверил так ли это или нет вначале. Если это так, то значит concatMap просто получает список потоков из concatMap а потом подписывается на них последовательно (дожидаясь окончания предыдущего).
источник

v

vitaly in ReactiveX - русскоговорящее сообщество
Artem Gilmudinov
я бы проверил так ли это или нет вначале. Если это так, то значит concatMap просто получает список потоков из concatMap а потом подписывается на них последовательно (дожидаясь окончания предыдущего).
Спасибо, буду проверять.
источник
2020 October 28

C

Clara in ReactiveX - русскоговорящее сообщество
Изучаем, как правильно выезжать из России в США и возвращаться обратно в Россию.

https://t.me/from_ru_to_the_usa
источник

A

Aleksandr in ReactiveX - русскоговорящее сообщество
Привет, столкнулся тут с проблемкой заполнения буффера
io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.


В общем-то подозреваю, что ошибка возникает в том месте, где я использую реактивный WebClient для http вызовов. Как выглядит флоу:

va
l params = mapOf(…)

webClient
 .post()
 .uri { it.build(params) }
 .body(BodyInserters.fromObject(mapOf(...)))
 .exchange()
 .flatMap { resp ->
   val statusCode = resp.statusCode()
   when {
    statusCode.is2xxSuccessful -> Mono.empty<Unit>()
    statusCode.is5xxServerError -> throw SomeServiceError()
    else -> checkError(resp, statusCode)
   }
 }
 
...

fun checkError(resp: ClientResponse, statusCode: HttpStatus): Mono<Unit> {
return resp.body(BodyExtractors.toMono(String::class.java)).flatMap {
 val e = SomeBusinessException(objectMapper.readValue(it, Map::class.java))
 
 if (...) {
  Mono.empty<Unit>()
 } else {
  Mono.error(e)
 }
}
}

Хорошо бы зарелизить тело когда я обрабатываю статус ответа, но проблема в том что в библиотеке я не нашел банального releaseBody() метода, только эти:

/**
* Set the body of the response. Calling this methods will
* {@linkplain org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release}
* the existing body of the builder.
* @param body the new body.
* @return this builder
*/
Buil
@linkplainkplain org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release}
* the existing body of the builder.
* @paramam body the new body.
* @returnurn this builder
*/
Builder body(Flux<DataBuffer> body);

/**
* Set the body of the response to the UTF-8 encoded bytes of the given string.
* Calling this methods will
* {@linkplain org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release}
* the existing body of the builder.
* @param body the new body.
* @return this builder
*/
Buil
@linkplainkplain org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release}
* the existing body of the builder.
* @paramam body the new body.
* @returnurn this builder
*/
Builder body(String body);

Ка
к юы лучше это организовать? Может кто сталкивался?
источник
2020 October 29

I

Ivan in ReactiveX - русскоговорящее сообщество
Правильно понимаю, что разницы особо нет никакой между

...
.subscribeOn(Schedulers.io())
.switchMap {
   repo.getSingle()
   
   .subscribeOn(Schedulers.io())
     .applyMyRetryWhen()
     .onErrorReturnItem(MyObj())
}


и

...
.subscribeOn(Schedulers.io())
.switchMap { repo.getSingle()) }
.applyMyRetryWhen()
.onErrorReturnItem(MyObj())
источник

l

lenar in ReactiveX - русскоговорящее сообщество
Ivan
Правильно понимаю, что разницы особо нет никакой между

...
.subscribeOn(Schedulers.io())
.switchMap {
   repo.getSingle()
   
   .subscribeOn(Schedulers.io())
     .applyMyRetryWhen()
     .onErrorReturnItem(MyObj())
}


и

...
.subscribeOn(Schedulers.io())
.switchMap { repo.getSingle()) }
.applyMyRetryWhen()
.onErrorReturnItem(MyObj())
есть, и не одна. одна связана с шедулерами, а вторая с обработкой ошибок (их в первом варианте нет для случая, когда первая часть фейлится)
источник

I

Ivan in ReactiveX - русскоговорящее сообщество
lenar
есть, и не одна. одна связана с шедулерами, а вторая с обработкой ошибок (их в первом варианте нет для случая, когда первая часть фейлится)
А можно чуть подробнее про скудулеры? Я вижу, только то, что retry в первом случае будет только к repo.getSingle() применён
источник

l

lenar in ReactiveX - русскоговорящее сообщество
Ivan
А можно чуть подробнее про скудулеры? Я вижу, только то, что retry в первом случае будет только к repo.getSingle() применён
без второго скедулера все вычисления будут идти в одном потоке, последовательно. это для случая вызов в репо сам тоже без потока
источник

l

lenar in ReactiveX - русскоговорящее сообщество
разница в том что во первом случае ты точно запустишь на потоке отличном от потока первой части (до свитча). это тоже для случая когда вызов репо без своего потока
источник
2020 October 30

I

Ivan in ReactiveX - русскоговорящее сообщество
lenar
без второго скедулера все вычисления будут идти в одном потоке, последовательно. это для случая вызов в репо сам тоже без потока
Понял, спасибо
источник
2020 November 05

A

Aleksandr in ReactiveX - русскоговорящее сообщество
Привет, читаю книжку и там нашёл следующее, цитирую:

println(“Before”);
Observable
         .range(5, 3)
         .subscribe(i -> println(i));
println(“After”);


We are intrested in is the thread that executed each log statement:
m
ain: Before
main: 5
main: 6
main: 7
main: After

The order of println statement is also relevant. It is not a surprise that Before and After messages printed by the main client thread. However, notice that subscription also happened in the client thread and subscribe() actually blocked the client thread until all events were received.

Вот и сразу напрашивается вопрос. Как правильно понимать что текущий поток будет заблочен до тех пор пока все ивэнты не будут получены? В чём тогда координальное отличие subscribe от block в данном контексте?
источник

l

lenar in ReactiveX - русскоговорящее сообщество
Aleksandr
Привет, читаю книжку и там нашёл следующее, цитирую:

println(“Before”);
Observable
         .range(5, 3)
         .subscribe(i -> println(i));
println(“After”);


We are intrested in is the thread that executed each log statement:
m
ain: Before
main: 5
main: 6
main: 7
main: After

The order of println statement is also relevant. It is not a surprise that Before and After messages printed by the main client thread. However, notice that subscription also happened in the client thread and subscribe() actually blocked the client thread until all events were received.

Вот и сразу напрашивается вопрос. Как правильно понимать что текущий поток будет заблочен до тех пор пока все ивэнты не будут получены? В чём тогда координальное отличие subscribe от block в данном контексте?
имеется в виду что именно эта цепочка не на отдельном шедулере (потоке выполнения), а на том же где и вызывается подписка. и получается, что пока не отработает цепочка, код дальше subscribe не будет выполняться
источник

l

lenar in ReactiveX - русскоговорящее сообщество
а если бы был отдельный шедулер, то цепочка и код после подписки выполнялись бы паралельно
источник

l

lenar in ReactiveX - русскоговорящее сообщество
а если block (а не subscribe), то и отдельный шедулер не спасет =)
источник

A

Aleksandr in ReactiveX - русскоговорящее сообщество
А, ну да, логично. Я просто в начале не так их понял.

Собственно этим и поясняют создание цепочки
1. Subscribe
2. OnNext
3. OnComplete
😅
источник
2020 November 08

E

Evgeniy in ReactiveX - русскоговорящее сообщество
драсце
источник