Skip to content

Commit

Permalink
Fewer functions, more parameterizations 💯
Browse files Browse the repository at this point in the history
An improved approach to writing API tests, which vastly reduces the amount of handwritten test code. The key change is to write one test function per API endpoint, with both request and response cases parameterized, instead of creating separate test functions for different kinds of responses.

Implemented so far for the keyword update and delete endpoints.
  • Loading branch information
marksparkza committed Dec 22, 2024
1 parent 82ec98e commit 4dc3798
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 239 deletions.
4 changes: 2 additions & 2 deletions odp/api/routers/keyword.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ async def update_keyword(
keyword_in: KeywordModelAdmin,
auth: Authorized = Depends(Authorize(ODPScope.KEYWORD_ADMIN)),
_=Depends(validate_keyword_input),
) -> KeywordModel:
) -> KeywordModel | None:
"""
Update a keyword. Requires scope `odp.keyword:admin`.
"""
Expand All @@ -311,7 +311,7 @@ async def update_keyword(
AuditCommand.update,
)

return output_keyword_model(keyword)
return output_keyword_model(keyword)


@router.delete(
Expand Down
297 changes: 66 additions & 231 deletions test/api/test_keyword.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from sqlalchemy import select

from odp.const import ODPScope
from odp.db.models import Keyword, KeywordAudit, Schema
from odp.db.models import Keyword, KeywordAudit
from test import TestSession
from test.api import assert_conflict, assert_empty_result, assert_forbidden, assert_new_timestamp, assert_not_found, assert_unprocessable
from test.factories import FactorySession, KeywordFactory, create_keyword_data, create_keyword_key, fake
Expand Down Expand Up @@ -565,290 +565,125 @@ def _test_create_keyword_invalid_data(


@pytest.mark.require_scope(ODPScope.KEYWORD_ADMIN)
@pytest.mark.parametrize('change', ['key', 'data', 'status', 'parent_id', None])
@pytest.mark.parametrize('change', [
None, 'key', 'data', 'status', 'parent_id',
])
@pytest.mark.parametrize('error', [
None, 'not_found', 'vocab_not_found', 'parent_not_found', 'invalid_data',
])
def test_update_keyword(
api,
scopes,
keyword_batch,
change,
error,
):
keywords_top, keywords_flat = keyword_batch
authorized = ODPScope.KEYWORD_ADMIN in scopes
keywords_top, keywords_flat = keyword_batch

old_kw = keywords_flat[-1]
old_ix = randint(0, len(keywords_flat) - 1)
old_kw = keywords_flat[old_ix]
new_kw_args = dict(
key=old_kw.key,
data=old_kw.data.copy(),
status=old_kw.status,
parent=old_kw.parent,
parent_id=old_kw.parent_id,
)
changed = True
if change == 'key':
new_kw_args['data']['key'] = new_kw_args['key'] = create_keyword_key(old_kw, -1)
elif change == 'data':
new_kw_args['data'] = create_keyword_data(old_kw, -1)
new_kw_args['data'] = create_keyword_data(old_kw, -1, invalid=error == 'invalid_data')
elif change == 'status':
new_kw_args['status'] = 'rejected' if old_kw.status == 'proposed' else 'proposed'
elif change == 'parent_id':
new_kw_args['parent'] = old_kw.parent.parent if old_kw.parent else None
new_kw_args['parent_id'] = old_kw.parent.parent_id if old_kw.parent else None
changed = new_kw_args['parent_id'] != old_kw.parent_id
else:
changed = False
new_kw_args['parent_id'] = 0 if error == 'parent_not_found' else old_kw.parent.parent_id if old_kw.parent else None
if new_kw_args['parent_id'] == old_kw.parent_id:
change = None

kw_id = 0 if error == 'not_found' else old_kw.id
vocab_id = 'foo' if error == 'vocab_not_found' else old_kw.vocabulary_id
new_kw = keyword_build(
id=old_kw.id,
vocabulary=old_kw.vocabulary,
vocabulary_id=old_kw.vocabulary_id,
**new_kw_args
)

r = api(scopes).put(f'/keyword/{old_kw.vocabulary_id}/{old_kw.id}', json=dict(
r = api(scopes).put(f'/keyword/{vocab_id}/{kw_id}', json=dict(
key=new_kw.key,
data=new_kw.data,
status=new_kw.status,
parent_id=new_kw.parent_id,
))

if authorized:
changed = False
if not authorized:
assert_forbidden(r)
elif error == 'not_found':
assert_not_found(r)
elif error == 'vocab_not_found':
assert_not_found(r, 'Vocabulary not found')
elif error == 'parent_not_found' and change == 'parent_id':
assert_not_found(r, 'Parent keyword not found')
elif error == 'invalid_data' and change == 'data':
assert_unprocessable(r, valid=False)
elif change:
assert_json_result(r, r.json(), new_kw)
if changed:
assert_db_state(keywords_flat[:-1] + [new_kw])
assert_audit_log(api.grant_type, dict(command='update', keyword=new_kw))
else:
assert_db_state(keywords_flat)
assert_no_audit_log()
assert_db_state(keywords_flat[:old_ix] + [new_kw] + keywords_flat[old_ix + 1:])
assert_audit_log(api.grant_type, dict(command='update', keyword=new_kw))
changed = True
else:
assert_forbidden(r)
assert_db_state(keywords_flat)
assert_no_audit_log()


@pytest.mark.require_scope(ODPScope.KEYWORD_ADMIN)
def test_set_keyword_old(
api,
scopes,
keyword_batch,
):
keywords_top, keywords_flat = keyword_batch
authorized = ODPScope.KEYWORD_ADMIN in scopes
client = api(scopes)

keywords_flat.reverse()
audit = []
for n in range(4):
create = n == 0 or randint(0, 1)
for i, kw in enumerate(keywords_flat):
if kw.id.count('/') == n:
data = {'abbr': fake.word(), 'title': fake.company()}
child_schema = FactorySession.execute(select(Schema)).scalars().first() if randint(0, 1) else None

if create:
kw_in = keyword_build(
parent_id=kw.id,
data=data,
child_schema=child_schema,
)
audit += [dict(command='insert', keyword=kw_in)]
else:
kw_in = keyword_build(
parent_id=kw.parent_id,
id=kw.id,
data=data,
child_schema=child_schema,
)
audit += [dict(command='update', keyword=kw_in, replace=i)]

r = client.put(f'/keyword/{kw_in.id}', json=dict(
parent_id=kw_in.parent_id,
data=kw_in.data,
status=kw_in.status,
child_schema_id=kw_in.child_schema_id,
))

if authorized:
assert_json_result(r, r.json(), kw_in)
else:
assert_forbidden(r)
break
assert_empty_result(r)

if authorized:
replace_indices = [entry.pop('replace', None) for entry in audit]
for replace_index in reversed(sorted(filter(lambda i: i is not None, replace_indices))):
keywords_flat.pop(replace_index)
assert_db_state(keywords_flat + [entry.get('keyword') for entry in audit])
assert_audit_log(api.grant_type, *audit)

else:
if not changed:
assert_db_state(keywords_flat)
assert_no_audit_log()


@pytest.mark.require_scope(ODPScope.KEYWORD_ADMIN)
def test_set_keyword_parent_not_found(
@pytest.mark.parametrize('error', [
None, 'not_found', 'vocab_not_found',
])
def test_delete_keyword(
api,
scopes,
keyword_batch,
error,
):
keywords_top, keywords_flat = keyword_batch
authorized = ODPScope.KEYWORD_ADMIN in scopes
client = api(scopes)

r = client.put('/keyword/foo/bar', json=dict(
data={'abbr': fake.word(), 'title': fake.company()},
parent_id='foo',
))
if authorized:
assert_not_found(r, "Parent keyword 'foo' does not exist")
else:
assert_forbidden(r)

r = client.put(f'/keyword/{(parent_id := keywords_top[2].id)}/foo/bar', json=dict(
data={'abbr': fake.word(), 'title': fake.company()},
parent_id=f'{parent_id}/foo',
))
if authorized:
assert_not_found(r, f"Parent keyword '{parent_id}/foo' does not exist")
else:
assert_forbidden(r)

assert_db_state(keywords_flat)
assert_no_audit_log()


@pytest.mark.require_scope(ODPScope.KEYWORD_ADMIN)
@pytest.mark.parametrize('create', [False, True])
def test_set_keyword_invalid_parent(
api,
scopes,
keyword_batch,
create,
):
keywords_top, keywords_flat = keyword_batch
authorized = ODPScope.KEYWORD_ADMIN in scopes
client = api(scopes)

# top-level keyword (vocab) cannot be created/updated via API
kw_id = 'foo' if create else keywords_top[0].id
r = client.put(f'/keyword/{kw_id}', json=dict(
data={'abbr': fake.word(), 'title': fake.company()},
parent_id='',
))
if authorized:
assert_unprocessable(r)
else:
assert_forbidden(r)

r = client.put(f'/keyword/{kw_id}/bar', json=dict(
data={'abbr': fake.word(), 'title': fake.company()},
parent_id='bar',
))
if authorized:
assert_unprocessable(r, f"'bar' cannot be a parent of '{kw_id}/bar'")
else:
assert_forbidden(r)

# self or sibling cannot be parent
for n in range(1, 4):
for kw in reversed(keywords_flat):
if kw.id.count('/') == n:
kw_id = f'{kw.parent.id}/foo' if create else kw.id
r = client.put(f'/keyword/{kw_id}', json=dict(
data={'abbr': fake.word(), 'title': fake.company()},
parent_id=f'{kw.id}',
))
if authorized:
assert_unprocessable(r, f"'{kw.id}' cannot be a parent of '{kw_id}'")
else:
assert_forbidden(r)
break

assert_db_state(keywords_flat)
assert_no_audit_log()

old_ix = randint(0, len(keywords_flat) - 1)
old_kw = keywords_flat[old_ix]
deleted_kw = KeywordFactory.stub(
vocabulary_id=old_kw.vocabulary_id,
id=old_kw.id,
key=old_kw.key,
data=old_kw.data.copy(),
status=old_kw.status,
parent_id=old_kw.parent_id,
has_children=True if old_kw.children else False,
)
kw_id = 0 if error == 'not_found' else old_kw.id
vocab_id = 'foo' if error == 'vocab_not_found' else old_kw.vocabulary_id

@pytest.mark.require_scope(ODPScope.KEYWORD_ADMIN)
def test_set_keyword_invalid_data(
api,
scopes,
keyword_batch,
):
keywords_top, keywords_flat = keyword_batch
authorized = ODPScope.KEYWORD_ADMIN in scopes
r = api(scopes).delete(f'/keyword/{vocab_id}/{kw_id}')

r = api(scopes).put(f'/keyword/{keywords_top[2].id}/foo', json=dict(
data={'abbr': fake.word()}, # missing required property 'title'
parent_id=keywords_top[2].id,
))
if authorized:
assert_unprocessable(r, valid=False)
else:
changed = False
if not authorized:
assert_forbidden(r)

assert_db_state(keywords_flat)
assert_no_audit_log()


@pytest.mark.require_scope(ODPScope.KEYWORD_ADMIN)
def test_delete_keyword(
api,
scopes,
keyword_batch,
):
keywords_top, keywords_flat = keyword_batch
authorized = ODPScope.KEYWORD_ADMIN in scopes
client = api(scopes)

keywords_flat.reverse()
audit = []
for n in range(4):
for i, kw in enumerate(keywords_flat):
if kw.id.count('/') == n:
if can_delete := not kw.children:
audit += [dict(command='delete', keyword=kw, index=i)]

r = client.delete(f'/keyword/{kw.id}')

if authorized:
if can_delete:
assert_empty_result(r)
else:
assert_unprocessable(r, f"Keyword '{kw.id}' cannot be deleted as it has sub-keywords")
else:
assert_forbidden(r)
break

if authorized:
delete_indices = [entry.pop('index') for entry in audit]
for delete_index in reversed(sorted(delete_indices)):
keywords_flat.pop(delete_index)
assert_db_state(keywords_flat)
assert_audit_log(api.grant_type, *audit)
elif error in ('not_found', 'vocab_not_found'):
assert_not_found(r)
elif deleted_kw.has_children:
assert_unprocessable(r, f"Keyword '{deleted_kw.id}' has child keywords")
else:
assert_empty_result(r)
assert_db_state(keywords_flat[:old_ix] + keywords_flat[old_ix + 1:])
assert_audit_log(api.grant_type, dict(command='delete', keyword=deleted_kw))
changed = True

if not changed:
assert_db_state(keywords_flat)
assert_no_audit_log()


@pytest.mark.require_scope(ODPScope.KEYWORD_ADMIN)
def test_delete_keyword_not_found(
api,
scopes,
keyword_batch,
):
keywords_top, keywords_flat = keyword_batch
authorized = ODPScope.KEYWORD_ADMIN in scopes
client = api(scopes)

for n in range(2):
for kw in reversed(keywords_flat):
if kw.id.count('/') == n:
r = client.delete(f'/keyword/{kw.id}/foo')
if authorized:
assert_not_found(r, f"Keyword '{kw.id}/foo' does not exist")
else:
assert_forbidden(r)
break

assert_db_state(keywords_flat)
assert_no_audit_log()
Loading

0 comments on commit 4dc3798

Please sign in to comment.