-
Notifications
You must be signed in to change notification settings - Fork 405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Custom CursorIterator #1208
Comments
Which behaviors do you intend to implement in the custom class? Anything that just makes sense to integrate directly? |
I implemented an iterator that kicks off a next fetch before yielding the result (see my concerns related to it here): from asyncio import CancelledError, Task, create_task
from collections.abc import AsyncIterable
async def iter_fast[T](iterable: AsyncIterable[T]):
iterator = aiter(iterable)
pending: Task[T] | None = None
while True:
try:
if pending is None:
item = await anext(iterator)
else:
item = await pending
except StopAsyncIteration:
break
else:
pending = create_task(anext(iterator))
yield item It's not possible to use it as is given that current iterator fetches rows in batches, but yields individual rows. async def flatten[T](iterable: AsyncIterable[list[T]]) -> AsyncIterable[T]:
async for batch in iterable:
for item in batch:
yield item So the end results looks somewhat like this: iterator = flatten(
iter_fast(
BatchIterator(conn, query, stmt._state, args, stmt._state.record_class, prefetch=15_000, timeout=timeout))) It yields a performance improvement, but not as impressive as I hoped. |
I'd like to implement my own CursorIterator, but I can't find a straightforward way of getting it done via asyncpg public interface.
Currently it's created by CursorFactory, which in turn is created by PreparedStatement.cursor.
I can see two options here:
The text was updated successfully, but these errors were encountered: