Насчет кода, вот тестовый вариант который заработал:
fun <T> Flow<T>.timedBuffer(millis:Long):Flow<List<T>> = channelFlow {
val list = arrayListOf<T>()
launch {
collect { list.add(it) }
}
launch {
while(true) {
delay(millis)
if(list.isNotEmpty()){
send(ArrayList(list))
list.clear()
}
}
}
}