Skip to content

Commit

Permalink
Fewer tests, more parameterizations 💯
Browse files Browse the repository at this point in the history
  • Loading branch information
marksparkza committed Dec 22, 2024
1 parent 82ec98e commit 783830d
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 186 deletions.
4 changes: 2 additions & 2 deletions odp/api/routers/keyword.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ async def update_keyword(
keyword_in: KeywordModelAdmin,
auth: Authorized = Depends(Authorize(ODPScope.KEYWORD_ADMIN)),
_=Depends(validate_keyword_input),
) -> KeywordModel:
) -> KeywordModel | None:
"""
Update a keyword. Requires scope `odp.keyword:admin`.
"""
Expand All @@ -311,7 +311,7 @@ async def update_keyword(
AuditCommand.update,
)

return output_keyword_model(keyword)
return output_keyword_model(keyword)


@router.delete(
Expand Down
209 changes: 31 additions & 178 deletions test/api/test_keyword.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from sqlalchemy import select

from odp.const import ODPScope
from odp.db.models import Keyword, KeywordAudit, Schema
from odp.db.models import Keyword, KeywordAudit
from test import TestSession
from test.api import assert_conflict, assert_empty_result, assert_forbidden, assert_new_timestamp, assert_not_found, assert_unprocessable
from test.factories import FactorySession, KeywordFactory, create_keyword_data, create_keyword_key, fake
Expand Down Expand Up @@ -565,17 +565,27 @@ def _test_create_keyword_invalid_data(


@pytest.mark.require_scope(ODPScope.KEYWORD_ADMIN)
@pytest.mark.parametrize('change', ['key', 'data', 'status', 'parent_id', None])
@pytest.mark.parametrize('change', [
None, 'key', 'data', 'status', 'parent_id',
])
@pytest.mark.parametrize('error', [
None, 'not_found', 'vocab_not_found', 'parent_not_found', 'invalid_data',
])
def test_update_keyword(
api,
scopes,
keyword_batch,
change,
error,
):
keywords_top, keywords_flat = keyword_batch
authorized = ODPScope.KEYWORD_ADMIN in scopes
keywords_top, keywords_flat = keyword_batch

old_kw = keywords_flat[-1]
vocab_id = 'foo' if error == 'vocab_not_found' else old_kw.vocabulary_id
kw_id = 0 if error == 'not_found' else old_kw.id
route = f'/keyword/{vocab_id}/{kw_id}'

new_kw_args = dict(
key=old_kw.key,
data=old_kw.data.copy(),
Expand All @@ -587,12 +597,12 @@ def test_update_keyword(
if change == 'key':
new_kw_args['data']['key'] = new_kw_args['key'] = create_keyword_key(old_kw, -1)
elif change == 'data':
new_kw_args['data'] = create_keyword_data(old_kw, -1)
new_kw_args['data'] = create_keyword_data(old_kw, -1, invalid=error == 'invalid_data')
elif change == 'status':
new_kw_args['status'] = 'rejected' if old_kw.status == 'proposed' else 'proposed'
elif change == 'parent_id':
new_kw_args['parent'] = old_kw.parent.parent if old_kw.parent else None
new_kw_args['parent_id'] = old_kw.parent.parent_id if old_kw.parent else None
new_kw_args['parent_id'] = 0 if error == 'parent_not_found' else old_kw.parent.parent_id if old_kw.parent else None
changed = new_kw_args['parent_id'] != old_kw.parent_id
else:
changed = False
Expand All @@ -604,192 +614,35 @@ def test_update_keyword(
**new_kw_args
)

r = api(scopes).put(f'/keyword/{old_kw.vocabulary_id}/{old_kw.id}', json=dict(
r = api(scopes).put(route, json=dict(
key=new_kw.key,
data=new_kw.data,
status=new_kw.status,
parent_id=new_kw.parent_id,
))

if authorized:
if not authorized:
assert_forbidden(r)
elif error == 'not_found':
assert_not_found(r)
elif error == 'vocab_not_found':
assert_not_found(r, 'Vocabulary not found')
elif error == 'parent_not_found' and change == 'parent_id':
assert_not_found(r, 'Parent keyword not found')
elif error == 'invalid_data' and change == 'data':
assert_unprocessable(r, valid=False)
elif changed:
assert_json_result(r, r.json(), new_kw)
if changed:
assert_db_state(keywords_flat[:-1] + [new_kw])
assert_audit_log(api.grant_type, dict(command='update', keyword=new_kw))
else:
assert_db_state(keywords_flat)
assert_no_audit_log()
assert_db_state(keywords_flat[:-1] + [new_kw])
assert_audit_log(api.grant_type, dict(command='update', keyword=new_kw))
else:
assert_forbidden(r)
assert_db_state(keywords_flat)
assert_no_audit_log()


@pytest.mark.require_scope(ODPScope.KEYWORD_ADMIN)
def test_set_keyword_old(
api,
scopes,
keyword_batch,
):
keywords_top, keywords_flat = keyword_batch
authorized = ODPScope.KEYWORD_ADMIN in scopes
client = api(scopes)
assert_empty_result(r)

keywords_flat.reverse()
audit = []
for n in range(4):
create = n == 0 or randint(0, 1)
for i, kw in enumerate(keywords_flat):
if kw.id.count('/') == n:
data = {'abbr': fake.word(), 'title': fake.company()}
child_schema = FactorySession.execute(select(Schema)).scalars().first() if randint(0, 1) else None

if create:
kw_in = keyword_build(
parent_id=kw.id,
data=data,
child_schema=child_schema,
)
audit += [dict(command='insert', keyword=kw_in)]
else:
kw_in = keyword_build(
parent_id=kw.parent_id,
id=kw.id,
data=data,
child_schema=child_schema,
)
audit += [dict(command='update', keyword=kw_in, replace=i)]

r = client.put(f'/keyword/{kw_in.id}', json=dict(
parent_id=kw_in.parent_id,
data=kw_in.data,
status=kw_in.status,
child_schema_id=kw_in.child_schema_id,
))

if authorized:
assert_json_result(r, r.json(), kw_in)
else:
assert_forbidden(r)
break

if authorized:
replace_indices = [entry.pop('replace', None) for entry in audit]
for replace_index in reversed(sorted(filter(lambda i: i is not None, replace_indices))):
keywords_flat.pop(replace_index)
assert_db_state(keywords_flat + [entry.get('keyword') for entry in audit])
assert_audit_log(api.grant_type, *audit)

else:
if not changed:
assert_db_state(keywords_flat)
assert_no_audit_log()


@pytest.mark.require_scope(ODPScope.KEYWORD_ADMIN)
def test_set_keyword_parent_not_found(
api,
scopes,
keyword_batch,
):
keywords_top, keywords_flat = keyword_batch
authorized = ODPScope.KEYWORD_ADMIN in scopes
client = api(scopes)

r = client.put('/keyword/foo/bar', json=dict(
data={'abbr': fake.word(), 'title': fake.company()},
parent_id='foo',
))
if authorized:
assert_not_found(r, "Parent keyword 'foo' does not exist")
else:
assert_forbidden(r)

r = client.put(f'/keyword/{(parent_id := keywords_top[2].id)}/foo/bar', json=dict(
data={'abbr': fake.word(), 'title': fake.company()},
parent_id=f'{parent_id}/foo',
))
if authorized:
assert_not_found(r, f"Parent keyword '{parent_id}/foo' does not exist")
else:
assert_forbidden(r)

assert_db_state(keywords_flat)
assert_no_audit_log()


@pytest.mark.require_scope(ODPScope.KEYWORD_ADMIN)
@pytest.mark.parametrize('create', [False, True])
def test_set_keyword_invalid_parent(
api,
scopes,
keyword_batch,
create,
):
keywords_top, keywords_flat = keyword_batch
authorized = ODPScope.KEYWORD_ADMIN in scopes
client = api(scopes)

# top-level keyword (vocab) cannot be created/updated via API
kw_id = 'foo' if create else keywords_top[0].id
r = client.put(f'/keyword/{kw_id}', json=dict(
data={'abbr': fake.word(), 'title': fake.company()},
parent_id='',
))
if authorized:
assert_unprocessable(r)
else:
assert_forbidden(r)

r = client.put(f'/keyword/{kw_id}/bar', json=dict(
data={'abbr': fake.word(), 'title': fake.company()},
parent_id='bar',
))
if authorized:
assert_unprocessable(r, f"'bar' cannot be a parent of '{kw_id}/bar'")
else:
assert_forbidden(r)

# self or sibling cannot be parent
for n in range(1, 4):
for kw in reversed(keywords_flat):
if kw.id.count('/') == n:
kw_id = f'{kw.parent.id}/foo' if create else kw.id
r = client.put(f'/keyword/{kw_id}', json=dict(
data={'abbr': fake.word(), 'title': fake.company()},
parent_id=f'{kw.id}',
))
if authorized:
assert_unprocessable(r, f"'{kw.id}' cannot be a parent of '{kw_id}'")
else:
assert_forbidden(r)
break

assert_db_state(keywords_flat)
assert_no_audit_log()


@pytest.mark.require_scope(ODPScope.KEYWORD_ADMIN)
def test_set_keyword_invalid_data(
api,
scopes,
keyword_batch,
):
keywords_top, keywords_flat = keyword_batch
authorized = ODPScope.KEYWORD_ADMIN in scopes

r = api(scopes).put(f'/keyword/{keywords_top[2].id}/foo', json=dict(
data={'abbr': fake.word()}, # missing required property 'title'
parent_id=keywords_top[2].id,
))
if authorized:
assert_unprocessable(r, valid=False)
else:
assert_forbidden(r)

assert_db_state(keywords_flat)
assert_no_audit_log()


@pytest.mark.require_scope(ODPScope.KEYWORD_ADMIN)
def test_delete_keyword(
api,
Expand Down
12 changes: 6 additions & 6 deletions test/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,17 @@ def create_metadata(record, n):
return metadata


def create_keyword_key(kw, n):
def create_keyword_key(kw, n, invalid=False):
if kw.vocabulary.schema.uri.endswith('institution'):
return fake.word() + str(n)
return -1 if invalid else fake.word() + str(n)
elif kw.vocabulary.schema.uri.endswith('sdg'):
if kw.parent_id is None:
return str(fake.pyint())
return str(fake.pyfloat(min_value=0))
return '' if invalid else str(fake.pyint())
return '' if invalid else str(fake.pyfloat(min_value=0))


def create_keyword_data(kw, n):
data = {'key': kw.key}
def create_keyword_data(kw, n, invalid=False):
data = {'foo': 'bar'} if invalid else {'key': kw.key}
if kw.vocabulary.schema.uri.endswith('institution'):
data |= {'abbr': fake.word() + str(n)}
elif kw.vocabulary.schema.uri.endswith('sdg'):
Expand Down

0 comments on commit 783830d

Please sign in to comment.