Size: a a a

2021 November 25

TS

Tair Sab in Astana JKUG
@KafkaListener юзаете?
источник

ES

Eugene Svalukhin in Astana JKUG
да
источник

TS

Tair Sab in Astana JKUG
ну вот там же можно передать Acknowledgement и вручную подтверждать прием?
источник

ES

Eugene Svalukhin in Astana JKUG
не смотрел этот момент
источник

SS

Shamil Sabirov in Astana JKUG
можно, но там ручками надо будет поработать. и скорее всего связываться еще с TransactionManager кафки в спринге. если же вычитываем пачку из какого топик/partition то можно сразу кафке сказать, что это мы уже прочитали. и проставить соответствующий offset для него
источник

TS

Tair Sab in Astana JKUG
public class Listener {

   List<String> buffer = new ArrayList<>();
   Instant time = Instant.now();

   @KafkaListener(..., batch="true", concurrency="1")
   public void listen(@Payload List<String> data, Acknowledgment ack) {
       buffer.addAll(data);
       Instant now = Instant.now();
       if (Duration.between(time, now).getSeconds() > 10 || buffer.size() > 10000) {
           process(buffer);
           buffer.clear();
           time = now;
           ack.acknowledge();
       }
   }

   private void process(List<String> messages) {
       // send to ClickHouse
   }

}
источник

SS

Shamil Sabirov in Astana JKUG
ну а если не получилось - то не обновляем offset
источник

TS

Tair Sab in Astana JKUG
тут 10 сек и 10_000 сообщений уже на ваш вкус можно подкрутить
источник

TS

Tair Sab in Astana JKUG
тут только косяк есть ))
источник

SS

Shamil Sabirov in Astana JKUG
ack.acknowledge() - только на посл сообщение из пакета отработает?
источник

TS

Tair Sab in Astana JKUG
если сообщения перестануть приходить, то ни таймаут, ни пороговое значение не сработает ))
источник

TS

Tair Sab in Astana JKUG
нужно будет какой-то таймер еще добавить, потом это синхронить и тд и тп
источник

TS

Tair Sab in Astana JKUG
ага, в кафке это норм
источник

TS

Tair Sab in Astana JKUG
акк на последнее сообщение автоматом аккает все предыдущие
источник

TS

Tair Sab in Astana JKUG
с батчами АККается весь батч
источник

SS

Shamil Sabirov in Astana JKUG
ну ето ведь не очень масштабируемо. если допустим в топике есть сообщения с офсетом 10, 11, 12. и допустим 2 сервиса(2 инстанса) будут читать. первый прочитал 10, 11 и  чтото не так пошло. он их в итоге не обработал. но второй инстанс прочитал сообщение 12 и у него все нормально. он сделал acknowledge. и по итогу офсет будет 12. а 10 и 11 сообщения потеряем
источник

TS

Tair Sab in Astana JKUG
кафка масшатабируется по другому
источник

TS

Tair Sab in Astana JKUG
кафка это не раббит
источник

TS

Tair Sab in Astana JKUG
в кафке нельзя акнуть 10-ое сообщение, при этом вернуть 9-ое в очередь
источник

TS

Tair Sab in Astana JKUG
На каждый “инстанс” тебе нужно дать отдельную партицию, если они даолжны читать разные сообщения
источник