def all_stream(query, opts \\ []) do
chunk_size = Keyword.get(opts, :chunk_size, 500)
offset = Keyword.get(opts, :offset, 0)
Stream.resource(
fn -> {query, offset} end,
fn {query, last_id} ->
list = query
|> where([e], field(e, ^
@all_stream_pk_id) > ^last_id)
|> limit(^chunk_size)
|> order_by([e], asc: field(e, ^
@all_stream_pk_id))
|> all
case List.last(list) do
%{
@all_stream_pk_id => id} -> {[list], {query, id}}
nil -> {:halt, {query, last_id}}
end
end,
fn _ -> [] end
)
end