Size: a a a

Kotlin Community

2020 August 01

AN

Alexander Nozik in Kotlin Community
Arkadii Ivanov
Как так нельзя?
Утаскивать channel из callbackFlow. Делайте нормальный канал.
источник

AI

Arkadii Ivanov in Kotlin Community
Alexander Nozik
Утаскивать channel из callbackFlow. Делайте нормальный канал.
В реальном случае это интероп с Rx.
источник

AI

Arkadii Ivanov in Kotlin Community
Скажем я в внутри channelFlow {} могу подписаться на внешний источник данных. Далее, многим позже, на главном потоке, я через этот источник выдаю данные и сразу синхронно отменяю Job. И падает. Вот такой пример.
источник

АЕ

Алексей Ершов... in Kotlin Community
а где именно вы ожидаете, что cancellation сработает и отменит выдачу элемента из флоу? cancellation is cooperative же, оно не убьёт моментально всю работу.
источник

AI

Arkadii Ivanov in Kotlin Community
Алексей Ершов
а где именно вы ожидаете, что cancellation сработает и отменит выдачу элемента из флоу? cancellation is cooperative же, оно не убьёт моментально всю работу.
Я так понимаю, что Dispatcher должен это проверять где-то в потрохах. Но видимо это не так работает.
источник

АЕ

Алексей Ершов... in Kotlin Community
а, дошло, вы ожидаете что offer отработает моментально и не завершится пока не будет вызван collect, чтобы isCancelled был false
источник

AI

Arkadii Ivanov in Kotlin Community
Алексей Ершов
а, дошло, вы ожидаете что offer отработает моментально и не завершится пока не будет вызван collect, чтобы isCancelled был false
Наверно не так. Я так понимаю выдача элемента потребителю происходит через асинхронный диспатчер. Т.е. я делаю offer, выдача этого элемента встаёт в очередь в Main диспатчер. Далее я отменяю корутину. И я ожидаю что очередь перестанет обрабатываться.
источник

АЕ

Алексей Ершов... in Kotlin Community
Хотя я всё-таки тоже с трудом понимаю, что вы пытаетесь сделать) Видимо код слишком упрощённый.
источник

АЕ

Алексей Ершов... in Kotlin Community
Arkadii Ivanov
Наверно не так. Я так понимаю выдача элемента потребителю происходит через асинхронный диспатчер. Т.е. я делаю offer, выдача этого элемента встаёт в очередь в Main диспатчер. Далее я отменяю корутину. И я ожидаю что очередь перестанет обрабатываться.
очередь вообще не связана с этой корутиной, кажется, вы же в скоупе на канал не подписываетесь. Там просто присвоение, и непонятно, почему cancel там что-то должен отменить.
источник

AI

Arkadii Ivanov in Kotlin Community
Алексей Ершов
Хотя я всё-таки тоже с трудом понимаю, что вы пытаетесь сделать) Видимо код слишком упрощённый.
Я пытаюсь понять, почему падает. Представьте ситуацию. Есть PublishSubject из Rx. Я на него подписываюсь через channelFlow{} и этот flow слушаю через Dispatcher.main. всё работает. Далее в какой-то момент, в одном колл стеке на главном потоке я: делаю subject.onNext(...) и сразу job.cancel(). У меня подписчик флоу получает этот элемент. Хотя мне кажется, что не должен.
источник

АЕ

Алексей Ершов... in Kotlin Community
А, теперь лучше. Здесь надо смотреть в реализацию каждого элемента этой цепи, вполне может быть что подписчик получает элемент прямо во время выполнения subject.onNext(...), ещё до того, как вы отмену вызвали.
источник

AI

Arkadii Ivanov in Kotlin Community
Алексей Ершов
А, теперь лучше. Здесь надо смотреть в реализацию каждого элемента этой цепи, вполне может быть что подписчик получает элемент прямо во время выполнения subject.onNext(...), ещё до того, как вы отмену вызвали.
Это же невозможно. Ведь всё на главном потоке
источник

АЕ

Алексей Ершов... in Kotlin Community
Почему невозможно? Например, вы вызываете onNext, внутри вызывается сабскрайбер, эмитит элемент во флоу, и он там сразу проваливается в коллект. Это одна гипотеза, вторая - отмена работает не так, как вы ожидаете, надо реально в реализацию флоу лезть.
источник

AI

Arkadii Ivanov in Kotlin Community
Алексей Ершов
Почему невозможно? Например, вы вызываете onNext, внутри вызывается сабскрайбер, эмитит элемент во флоу, и он там сразу проваливается в коллект. Это одна гипотеза, вторая - отмена работает не так, как вы ожидаете, надо реально в реализацию флоу лезть.
Так нет! Я же написал, что Main диспатчер асинхронный. Т.к. главной поток занят, то эмиссия встаёт в очередь. Я это даже проверил.
источник

AI

Arkadii Ivanov in Kotlin Community
Если использовать Main.immediate то всё кстати работает и не падает
источник

АЕ

Алексей Ершов... in Kotlin Community
можно завести issue и мэтры объяснят, где вы неправы) Или в котлин слак.
источник

AI

Arkadii Ivanov in Kotlin Community
Алексей Ершов
можно завести issue и мэтры объяснят, где вы неправы) Или в котлин слак.
Вот тоже так думаю, спасибо!
источник

EP

Eugene P. in Kotlin Community
@ArkaNN1985 David Karnok сделал PublishSubject на flow - может, вам подойдет
источник

AI

Arkadii Ivanov in Kotlin Community
Eugene P.
@ArkaNN1985 David Karnok сделал PublishSubject на flow - может, вам подойдет
Да тут концептуальная проблема. Это я просто как пример привел.
источник

EP

Eugene P. in Kotlin Community
@ArkaNN1985 В rx-java ведь вся цепочка синхронно выполняется, если нет разрыва с помощью observeOn. В flow вся цепочка на suspend методах, вроде.
источник