-
Notifications
You must be signed in to change notification settings - Fork 151
/
Copy pathtest_crud.py
333 lines (263 loc) · 10.5 KB
/
test_crud.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import random
import pytest
from .models import db, User, UserType, Friendship, Relation, MYSQL_URL
pytestmark = pytest.mark.asyncio
async def test_create(engine):
nickname = "test_create_{}".format(random.random())
u = await User.create(
bind=engine, timeout=10, nickname=nickname, age=42, type=UserType.USER
)
assert u.id is not None
assert u.nickname == nickname
assert u.type == UserType.USER
assert u.age == 42
u2 = await User.get(u.id, bind=engine, timeout=10)
assert u2.id == u.id
assert u2.nickname == nickname
assert u2.type == UserType.USER
assert u2.age == 42
assert u2 is not u
return u
async def test_create_from_instance(engine):
nickname = "test_create_from_instance_{}".format(random.random())
u = User(nickname="will-be-replaced", type=UserType.USER, age=42)
u.nickname = nickname
u.age = 21
await u.create(bind=engine, timeout=10)
assert u.id is not None
assert u.nickname == nickname
assert u.type == UserType.USER
assert u.age == 21
u2 = await User.get(u.id, bind=engine, timeout=10)
assert u2.id == u.id
assert u2.nickname == nickname
assert u2.type == UserType.USER
assert u2.age == 21
assert u2 is not u
return u
async def test_get(engine):
u1 = await test_create(engine)
u2 = await User.get(u1.id, bind=engine, timeout=10)
assert u1.id == u2.id
assert u1.nickname == u2.nickname
assert u1 is not u2
u3 = await engine.first(u1.query)
assert u1.id == u3.id
assert u1.nickname == u3.nickname
assert u1 is not u3
u4 = await test_create_from_instance(engine)
u5 = await engine.first(u4.query)
assert u4.id == u5.id
assert u4.nickname == u5.nickname
assert u4 is not u5
async def test_textual_sql(engine):
u1 = await test_create(engine)
u2 = await engine.first(
db.text("SELECT * FROM gino_users WHERE id = :uid")
.bindparams(uid=u1.id)
.columns(*User)
.execution_options(model=User)
)
assert isinstance(u2, User)
assert u1.id == u2.id
assert u1.nickname == u2.nickname
assert u1.type is u2.type
assert u1 is not u2
u2 = await engine.first(
db.text("SELECT * FROM gino_users WHERE id = :uid AND type = :utype")
.bindparams(db.bindparam("utype", type_=db.Enum(UserType)))
.bindparams(uid=u1.id, utype=UserType.USER,)
.columns(*User)
.execution_options(model=User)
)
assert isinstance(u2, User)
assert u1.id == u2.id
assert u1.nickname == u2.nickname
assert u1.type is u2.type
assert u1 is not u2
async def test_select(engine):
u = await test_create(engine)
name = await engine.scalar(User.select("nickname").where(User.id == u.id))
assert u.nickname == name
name = await engine.scalar(u.select("nickname"))
assert u.nickname == name
async def test_get_multiple_primary_key(engine):
u1 = await test_create(engine)
u2 = await test_create(engine)
await Friendship.create(bind=engine, my_id=u1.id, friend_id=u2.id)
with pytest.raises(ValueError, match="Incorrect number of values as primary key"):
await Friendship.get((u1.id,), bind=engine)
with pytest.raises(ValueError, match="Incorrect number of values as primary key"):
await Friendship.get(u1.id, bind=engine)
f = await Friendship.get((u1.id, u2.id), bind=engine)
assert f
assert f.my_id == u1.id
assert f.friend_id == u2.id
async def test_multiple_primary_key_order():
import gino
db1 = gino.Gino()
await db1.set_bind(MYSQL_URL)
class NameCard(db1.Model):
__tablename__ = "name_cards"
first_name = db1.Column(db1.Unicode(255), primary_key=True)
last_name = db1.Column(db1.Unicode(255), primary_key=True)
await db1.gino.create_all()
try:
await NameCard.create(first_name="first", last_name="last")
nc = await NameCard.get(("first", "last"))
assert nc.first_name == "first"
assert nc.last_name == "last"
with pytest.raises(ValueError, match="expected 2, got 3"):
await NameCard.get(dict(a=1, first_name="first", last_name="last"))
with pytest.raises(KeyError, match="first_name"):
await NameCard.get(dict(first="first", last_name="last"))
nc = await NameCard.get(dict(first_name="first", last_name="last"))
assert nc.first_name == "first"
assert nc.last_name == "last"
nc = await NameCard.get({0: "first", 1: "last"})
assert nc.first_name == "first"
assert nc.last_name == "last"
finally:
await db1.gino.drop_all()
await db1.pop_bind().close()
db2 = gino.Gino(MYSQL_URL)
await db2.set_bind(MYSQL_URL)
class NameCard(db2.Model):
__tablename__ = "name_cards"
last_name = db2.Column(db2.Unicode(255), primary_key=True)
first_name = db2.Column(db2.Unicode(255), primary_key=True)
await db2.gino.create_all()
try:
await NameCard.create(first_name="first", last_name="last")
nc = await NameCard.get(("last", "first"))
assert nc.first_name == "first"
assert nc.last_name == "last"
nc = await NameCard.get(dict(first_name="first", last_name="last"))
assert nc.first_name == "first"
assert nc.last_name == "last"
nc = await NameCard.get({1: "first", "last_name": "last"})
assert nc.first_name == "first"
assert nc.last_name == "last"
finally:
await db2.gino.drop_all()
await db2.pop_bind().close()
async def test_connection_as_bind(engine):
async with engine.acquire() as conn:
await test_get(conn)
async def test_update(engine, random_name):
u1 = await test_create(engine)
await u1.update(nickname=random_name).apply(bind=engine, timeout=10)
u2 = await User.get(u1.id, bind=engine)
assert u2.nickname == random_name
async def test_update_missing(engine, random_name):
from gino.exceptions import NoSuchRowError
u1 = await test_create(engine)
rq = u1.update(nickname=random_name)
await u1.delete(bind=engine)
with pytest.raises(NoSuchRowError):
await rq.apply(bind=engine, timeout=10)
async def test_update_multiple_primary_key(engine):
u1 = await test_create(engine)
u2 = await test_create(engine)
u3 = await test_create(engine)
await Friendship.create(bind=engine, my_id=u1.id, friend_id=u2.id)
f = await Friendship.get((u1.id, u2.id), bind=engine)
await f.update(my_id=u2.id, friend_id=u3.id).apply(bind=engine)
f2 = await Friendship.get((u2.id, u3.id), bind=engine)
assert f2
async def test_delete(engine):
u1 = await test_create(engine)
await u1.delete(bind=engine, timeout=10)
u2 = await User.get(u1.id, bind=engine)
assert not u2
async def test_delete_bind(bind):
u1 = await test_create(bind)
await u1.delete(timeout=10)
u2 = await User.get(u1.id)
assert not u2
async def test_delete_multiple_primary_key(engine):
u1 = await test_create(engine)
u2 = await test_create(engine)
f = await Friendship.create(bind=engine, my_id=u1.id, friend_id=u2.id)
await f.delete(bind=engine)
f2 = await Friendship.get((u1.id, u2.id), bind=engine)
assert not f2
async def test_string_primary_key(engine):
relations = ["Colleagues", "Friends", "Lovers"]
for r in relations:
await Relation.create(bind=engine, timeout=10, name=r)
r1 = await Relation.get(relations[0], bind=engine, timeout=10)
assert r1.name == relations[0]
async def test_lookup_287(bind):
from gino.exceptions import NoSuchRowError
class Game(db.Model):
__tablename__ = "games"
game_id = db.Column(db.String(32), unique=True)
channel_id = db.Column(db.String(1), default="A")
await Game.gino.create()
try:
game_1 = await Game.create(game_id="1", channel_id="X")
game_2 = await Game.create(game_id="2", channel_id="Y")
# ordinary update should be fine
uq = game_1.update(game_id="3")
with pytest.raises(TypeError, match="Model Game has no table, primary key"):
# but applying the updates to DB should fail
await uq.apply()
with pytest.raises(
LookupError, match="Instance-level CRUD operations not allowed"
):
await game_2.delete()
with pytest.raises(
LookupError, match="Instance-level CRUD operations not allowed"
):
await game_2.query.gino.all()
with pytest.raises(
LookupError, match="Instance-level CRUD operations not allowed"
):
await game_2.select("game_id")
# previous ordinary update still in effect
assert game_1.game_id == "3"
assert await Game.select("game_id").gino.all() == [("1",), ("2",)]
Game.lookup = lambda self: Game.game_id == self.game_id
with pytest.raises(NoSuchRowError):
await game_1.update(channel_id="Z").apply()
await game_2.update(channel_id="Z").apply()
assert await Game.select("channel_id").gino.all() == [("X",), ("Z",)]
finally:
await Game.gino.drop()
async def test_lookup_custom_name(bind):
class ModelWithCustomColumnNames(db.Model):
__tablename__ = "gino_test_custom_column_names"
id = db.Column("other", db.Integer(), primary_key=True)
field = db.Column(db.Text())
await ModelWithCustomColumnNames.gino.create()
try:
# create
m1 = await ModelWithCustomColumnNames.create(id=1, field="A")
m2 = await ModelWithCustomColumnNames.create(id=2, field="B")
# update
uq = m1.update(field="C")
await uq.apply()
# lookup
assert set(
tuple(x) for x in await ModelWithCustomColumnNames.select("id").gino.all()
) == {(1,), (2,)}
assert (await ModelWithCustomColumnNames.get(2)).field == "B"
assert (await ModelWithCustomColumnNames.get(1)).field == "C"
assert await ModelWithCustomColumnNames.get(3) is None
# delete
assert (
await ModelWithCustomColumnNames.delete.where(
ModelWithCustomColumnNames.id == 3
).gino.status()
)[0] == 0
assert (
await ModelWithCustomColumnNames.delete.where(
ModelWithCustomColumnNames.id == 2
).gino.status()
)[0] == 1
assert set(
tuple(x) for x in await ModelWithCustomColumnNames.select("id").gino.all()
) == {(1,)}
finally:
await ModelWithCustomColumnNames.gino.drop()