Size: a a a

Scala User Group

2021 January 19

AS

Alex Sh in Scala User Group
Ilya
Ох. Сколько такого пришлось хлебнуть. И в чистой java, и на скале с akka и с zio. И главное что я запомнил из этого опыта - параллельная обработка сообщений в рамках одной партиции почти всегда зло. Особенно если нельзя терять сообщения.
Надо параллелить - добавь ещё партицию.
Ну я в итоге сделал produce(record).flatten как раз чтобы избежать таких отхлёбов 🙂
источник

ЮБ

Юрий Бадальянц... in Scala User Group
Alex Sh
Такая логика не ломает ordering между исходящими сообщениями?
Я прям сча этим же занимаюсь
Точно не ломает. parEvalMap работает асинхронно, но сохраняет порядок. Есть ещё parEvalMapUnordered. Вот в нём порядок теряется.
источник

ЮБ

Юрий Бадальянц... in Scala User Group
Alex Sh
Ну я в итоге сделал produce(record).flatten как раз чтобы избежать таких отхлёбов 🙂
Это подойдёт для тестов, но никак не для прода
источник

ЮБ

Юрий Бадальянц... in Scala User Group
Anton Velichko
Ломает
Это не так
источник

ЮБ

Юрий Бадальянц... in Scala User Group
Alex Sh
Ну я в итоге сделал produce(record).flatten как раз чтобы избежать таких отхлёбов 🙂
Ещё в принципе можно самому батч копить через какой-нибудь groupWithin. И разом весь батч слать. В таком варианте нормально делать flatten, потому что ожидаться будет весь батч. Но это намного странный способ, потому что продюсер и так уже батчует из коробки.
источник

AS

Alex Sh in Scala User Group
Юрий Бадальянц
Ещё в принципе можно самому батч копить через какой-нибудь groupWithin. И разом весь батч слать. В таком варианте нормально делать flatten, потому что ожидаться будет весь батч. Но это намного странный способ, потому что продюсер и так уже батчует из коробки.
Такое видел в доках... Решил начать с наиболее простого варианта
источник

ЮБ

Юрий Бадальянц... in Scala User Group
Ilya
Ох. Сколько такого пришлось хлебнуть. И в чистой java, и на скале с akka и с zio. И главное что я запомнил из этого опыта - параллельная обработка сообщений в рамках одной партиции почти всегда зло. Особенно если нельзя терять сообщения.
Надо параллелить - добавь ещё партицию.
Не путайте параллельную и асинхронную обработку. Да, строго паралелить внутри патриции нельзя. Но асинхронно делать какой-то шаг с парализмом больше 1 вполне можно. Главное, чтобы на входе и выходе этого шага стрима сохранялся порядок. И именно так работает parEvalMap
источник

AS

Alex Sh in Scala User Group
Юрий Бадальянц
Точно не ломает. parEvalMap работает асинхронно, но сохраняет порядок. Есть ещё parEvalMapUnordered. Вот в нём порядок теряется.
Я правильно понимаю, что из этого утверждения следует то, что порядок отправки сообщений в кафку строго определяется порядком добавления этих сообщений в буфер?
источник

ЮБ

Юрий Бадальянц... in Scala User Group
Alex Sh
Я правильно понимаю, что из этого утверждения следует то, что порядок отправки сообщений в кафку строго определяется порядком добавления этих сообщений в буфер?
Да
источник

ЮБ

Юрий Бадальянц... in Scala User Group
Мы там выше с лолкэтом это обсуждали кстати. В 1.2.0 эту гарантию сломали, а в 1.3.0 починили.
источник

AV

Anton Velichko in Scala User Group
Юрий Бадальянц
Это не так
ну как это, порядок результатов гарантируется, но не порядок выполнения, у тебя parEvalMap(6) на выходе даст тот же порядок что на входе но порядок выполнения  io не гарантируется
источник

ЮБ

Юрий Бадальянц... in Scala User Group
Anton Velichko
ну как это, порядок результатов гарантируется, но не порядок выполнения, у тебя parEvalMap(6) на выходе даст тот же порядок что на входе но порядок выполнения  io не гарантируется
Так вопрос не про это был, а про порядок на выходе
источник

AV

Anton Velichko in Scala User Group
ну так порядок отправки тождественен порядку выполнения msg.publish что в итоге не совсем равен порядку на выходе
источник

AV

Anton Velichko in Scala User Group
из parEvalMap
источник

AV

Anton Velichko in Scala User Group
Anton Velichko
ну так порядок отправки тождественен порядку выполнения msg.publish что в итоге не совсем равен порядку на выходе
точнее он может быть равен но без гарантий
источник

AV

Anton Velichko in Scala User Group
это как с mapAsync в акка стримах
источник

AV

Anton Velichko in Scala User Group
порядок событий на выходе равен порядку на входе, но внутри они могут выполняться как им удобно
источник

I

Ilya in Scala User Group
λoλcat
Я в кафку посылаю то, что ждать не обязательно
Ну вот исходя из этого можно просто положится на порядок добавления сообщений в драйвер Кафки. Он сам по себе достаточно умный. Я правда не уверен что зио-кафка так умеет
источник

ЮБ

Юрий Бадальянц... in Scala User Group
Что? Если ты посмотришь на мой сниппет, то msg.publish запускается без параллелизма, а вот ожидание второго F уже делается с парализмом. Я там писал, как это работает и я точно уверен, что это корректно. Я с этим уже давно на проде живу и куча тестов с порядком есть
источник

ЮБ

Юрий Бадальянц... in Scala User Group
Anton Velichko
ну так порядок отправки тождественен порядку выполнения msg.publish что в итоге не совсем равен порядку на выходе
Я вот про это
источник