-
Notifications
You must be signed in to change notification settings - Fork 418
/
Copy pathtest_cursor.py
151 lines (114 loc) · 5.33 KB
/
test_cursor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
import asyncpg
import inspect
from asyncpg import _testbase as tb
class TestIterableCursor(tb.ConnectedTestCase):
async def test_cursor_iterable_01(self):
st = await self.con.prepare('SELECT generate_series(0, 20)')
expected = await st.fetch()
for prefetch in range(1, 25):
with self.subTest(prefetch=prefetch):
async with self.con.transaction():
result = []
async for rec in st.cursor(prefetch=prefetch):
result.append(rec)
self.assertEqual(
result, expected,
'result != expected for prefetch={}'.format(prefetch))
async def test_cursor_iterable_02(self):
# Test that it's not possible to create a cursor without hold
# outside of a transaction
s = await self.con.prepare(
'DECLARE t BINARY CURSOR WITHOUT HOLD FOR SELECT 1')
with self.assertRaises(asyncpg.NoActiveSQLTransactionError):
await s.fetch()
# Now test that statement.cursor() does not let you
# iterate over it outside of a transaction
st = await self.con.prepare('SELECT generate_series(0, 20)')
it = st.cursor(prefetch=5).__aiter__()
if inspect.isawaitable(it):
it = await it
with self.assertRaisesRegex(asyncpg.NoActiveSQLTransactionError,
'cursor cannot be created.*transaction'):
await it.__anext__()
async def test_cursor_iterable_03(self):
st = await self.con.prepare('SELECT generate_series(0, 20)')
it = st.cursor().__aiter__()
if inspect.isawaitable(it):
it = await it
st._state.mark_closed()
with self.assertRaisesRegex(asyncpg.InterfaceError,
'statement is closed'):
async for _ in it: # NOQA
pass
async def test_cursor_iterable_04(self):
st = await self.con.prepare('SELECT generate_series(0, 20)')
st._state.mark_closed()
with self.assertRaisesRegex(asyncpg.InterfaceError,
'statement is closed'):
async for _ in st.cursor(): # NOQA
pass
async def test_cursor_iterable_05(self):
st = await self.con.prepare('SELECT generate_series(0, 20)')
for prefetch in range(-1, 1):
with self.subTest(prefetch=prefetch):
with self.assertRaisesRegex(asyncpg.InterfaceError,
'must be greater than zero'):
async for _ in st.cursor(prefetch=prefetch): # NOQA
pass
async def test_cursor_iterable_06(self):
recs = []
async with self.con.transaction():
async for rec in self.con.cursor(
'SELECT generate_series(0, $1::int)', 10):
recs.append(rec)
self.assertEqual(recs, [(i,) for i in range(11)])
class TestCursor(tb.ConnectedTestCase):
async def test_cursor_01(self):
st = await self.con.prepare('SELECT generate_series(0, 20)')
with self.assertRaisesRegex(asyncpg.NoActiveSQLTransactionError,
'cursor cannot be created.*transaction'):
await st.cursor()
async def test_cursor_02(self):
st = await self.con.prepare('SELECT generate_series(0, 20)')
async with self.con.transaction():
cur = await st.cursor()
for i in range(-1, 1):
with self.assertRaisesRegex(asyncpg.InterfaceError,
'greater than zero'):
await cur.fetch(i)
res = await cur.fetch(2)
self.assertEqual(res, [(0,), (1,)])
rec = await cur.fetchrow()
self.assertEqual(rec, (2,))
r = repr(cur)
self.assertTrue(r.startswith('<asyncpg.Cursor '))
self.assertNotIn(' exhausted ', r)
self.assertIn('"SELECT generate', r)
moved = await cur.forward(5)
self.assertEqual(moved, 5)
rec = await cur.fetchrow()
self.assertEqual(rec, (8,))
res = await cur.fetch(100)
self.assertEqual(res, [(i,) for i in range(9, 21)])
self.assertIsNone(await cur.fetchrow())
self.assertEqual(await cur.fetch(5), [])
r = repr(cur)
self.assertTrue(r.startswith('<asyncpg.Cursor '))
self.assertIn(' exhausted ', r)
self.assertIn('"SELECT generate', r)
async def test_cursor_03(self):
st = await self.con.prepare('SELECT generate_series(0, 20)')
async with self.con.transaction():
with self.assertRaisesRegex(asyncpg.InterfaceError,
'prefetch argument can only'):
await st.cursor(prefetch=10)
async def test_cursor_04(self):
async with self.con.transaction():
st = await self.con.cursor('SELECT generate_series(0, 100)')
await st.forward(42)
self.assertEqual(await st.fetchrow(), (42,))