TS
Size: a a a
TS
ES
TS
ES
SS
TS
public class Listener {
List<String> buffer = new ArrayList<>();
Instant time = Instant.now();
@KafkaListener(..., batch="true", concurrency="1")
public void listen(@Payload List<String> data, Acknowledgment ack) {
buffer.addAll(data);
Instant now = Instant.now();
if (Duration.between(time, now).getSeconds() > 10 || buffer.size() > 10000) {
process(buffer);
buffer.clear();
time = now;
ack.acknowledge();
}
}
private void process(List<String> messages) {
// send to ClickHouse
}
}SS
TS
TS
SS
TS
TS
TS
TS
TS
SS
TS
TS
TS
TS