Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow full clear of completed jobs #503

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ development.ini
node_modules
*.project
.eggs
.vscode/
.vscode/
.idea/
17 changes: 7 additions & 10 deletions ckanext/harvest/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,26 +108,23 @@ def clear(ctx, id):

@source.command()
@click.argument(u"id", metavar=u"SOURCE_ID_OR_NAME", required=False)
@click.option(
"-k",
"--keep-current",
default=False
)
@click.pass_context
def clear_history(ctx, id, keep_current):
def clear_history(ctx, id):
"""If no source id is given the history for all harvest sources
(maximum is 1000) will be cleared.

Clears all jobs and objects related to a harvest source, but keeps
the source itself. The datasets imported from the harvest source
will NOT be deleted!!! If a source id is given, it only clears
Clears all jobs and out-of-date objects related to a harvest source, but keeps
the source itself and a history of what has been harvested already.
The datasets imported from the harvest source will NOT be deleted!!!

If a source id is given, it only clears
the history of the harvest source with the given source id.

"""
flask_app = ctx.meta["flask_app"]

with flask_app.test_request_context():
result = utils.clear_harvest_source_history(id, bool(keep_current))
result = utils.clear_harvest_source_history(id)
click.secho(result, fg="green")


Expand Down
11 changes: 1 addition & 10 deletions ckanext/harvest/commands/harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,6 @@ def __init__(self, name):
will be aborted. You can use comma as a separator to provide multiple source_id's""",
)

self.parser.add_option(
"-k",
"--keep-current",
dest="keep_current",
default=False,
help="Do not delete relevant harvest objects",
)

def command(self):
self._load_config()

Expand Down Expand Up @@ -324,12 +316,11 @@ def create_harvest_source(self):
print(result)

def clear_harvest_source_history(self):
keep_current = bool(self.options.keep_current)
source_id = None
if len(self.args) >= 2:
source_id = unicode_safe(self.args[1])

print(utils.clear_harvest_source_history(source_id, keep_current))
print(utils.clear_harvest_source_history(source_id))

def show_harvest_source(self):

Expand Down
46 changes: 10 additions & 36 deletions ckanext/harvest/logic/action/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,16 +313,14 @@ def harvest_abort_failed_jobs(context, data_dict):

def harvest_sources_job_history_clear(context, data_dict):
'''
Clears the history for all active harvest sources. All jobs and objects related to a harvest source will
be cleared, but keeps the source itself.
Clears the history for all active harvest sources. All non-running jobs and non-current harvest objects will
be cleared, but keeps the source itself and the most current harvest objects.
This is useful to clean history of long running harvest sources to start again fresh.
The datasets imported from the harvest source will NOT be deleted!!!

'''
check_access('harvest_sources_clear', context, data_dict)

keep_current = data_dict.get('keep_current', False)

job_history_clear_results = []
# We assume that the maximum of 1000 (hard limit) rows should be enough
result = logic.get_action('package_search')(context, {'fq': '+dataset_type:harvest', 'rows': 1000})
Expand All @@ -331,7 +329,7 @@ def harvest_sources_job_history_clear(context, data_dict):
for data_dict in harvest_packages:
try:
clear_result = get_action('harvest_source_job_history_clear')(
context, {'id': data_dict['id'], 'keep_current': keep_current})
context, {'id': data_dict['id']})
job_history_clear_results.append(clear_result)
except NotFound:
# Ignoring not existent harvest sources because of a possibly corrupt search index
Expand All @@ -343,7 +341,7 @@ def harvest_sources_job_history_clear(context, data_dict):

def harvest_source_job_history_clear(context, data_dict):
'''
Clears all jobs and objects related to a harvest source, but keeps the source itself.
Clears all jobs and out-of-date harvest objects from a harvest source.
This is useful to clean history of long running harvest sources to start again fresh.
The datasets imported from the harvest source will NOT be deleted!!!

Expand All @@ -354,7 +352,6 @@ def harvest_source_job_history_clear(context, data_dict):
check_access('harvest_source_clear', context, data_dict)

harvest_source_id = data_dict.get('id', None)
keep_current = data_dict.get('keep_current', False)

source = HarvestSource.get(harvest_source_id)
if not source:
Expand All @@ -365,49 +362,26 @@ def harvest_source_job_history_clear(context, data_dict):

model = context['model']

if keep_current:
sql = '''BEGIN;
sql = '''BEGIN;
DELETE FROM harvest_object_error WHERE harvest_object_id
IN (SELECT id FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}'
AND current != true
AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id
AND status = 'Running'))
AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id
AND current = true))
);
AND status = 'Running')));
DELETE FROM harvest_object_extra WHERE harvest_object_id
IN (SELECT id FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}'
AND current != true
AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id
AND status = 'Running'))
AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id
AND current = true))
);
AND status = 'Running')));
DELETE FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}'
AND current != true
AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id
AND status = 'Running'))
AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id
AND current = true));
AND status = 'Running'));
DELETE FROM harvest_gather_error WHERE harvest_job_id
IN (SELECT id FROM harvest_job AS job WHERE source_id = '{harvest_source_id}'
AND job.status != 'Running'
AND NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = job.id));
AND job.status != 'Running');
DELETE FROM harvest_job AS job WHERE source_id = '{harvest_source_id}'
AND job.status != 'Running'
AND NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = job.id);
COMMIT;
'''.format(harvest_source_id=harvest_source_id)
else:
sql = '''BEGIN;
DELETE FROM harvest_object_error WHERE harvest_object_id
IN (SELECT id FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_object_extra WHERE harvest_object_id
IN (SELECT id FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}';
DELETE FROM harvest_gather_error WHERE harvest_job_id
IN (SELECT id FROM harvest_job WHERE source_id = '{harvest_source_id}');
DELETE FROM harvest_job WHERE source_id = '{harvest_source_id}';
AND job.status != 'Running';
COMMIT;
'''.format(harvest_source_id=harvest_source_id)

Expand Down
3 changes: 2 additions & 1 deletion ckanext/harvest/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
'HarvestObject', 'harvest_object_table',
'HarvestGatherError', 'harvest_gather_error_table',
'HarvestObjectError', 'harvest_object_error_table',
'HarvestObjectExtra', 'harvest_object_extra_table',
'HarvestLog', 'harvest_log_table'
]

Expand Down Expand Up @@ -356,7 +357,7 @@ def define_harvester_tables():
Column('state', types.UnicodeText, default=u'WAITING'),
Column('metadata_modified_date', types.DateTime),
Column('retry_times', types.Integer, default=0),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id', ondelete='SET NULL'), nullable=True),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing table structure would require migration script, otherwise it would not be applied to any instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that a schema change creates some challenges for existing databases. However, the ckanapi package gives very nice support for exporting and re-importing users, packages, and organizations, so that a fresh database instance with imported users, etc is not very time consuming. I could write up a set of instructions for the Wiki if this is useful.

Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
Column('package_id', types.UnicodeText, ForeignKey('package.id', deferrable=True),
nullable=True),
Expand Down
107 changes: 14 additions & 93 deletions ckanext/harvest/tests/test_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,45 +296,6 @@ def test_harvest_sources_job_history_clear(self):
data_dict['url'] = 'http://another-url'
source_2 = factories.HarvestSourceObj(**data_dict)

job_1 = factories.HarvestJobObj(source=source_1)
dataset_1 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
package_id=dataset_1['id'])
job_2 = factories.HarvestJobObj(source=source_2)
dataset_2 = ckan_factories.Dataset()
object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2,
package_id=dataset_2['id'])

# execute
context = {'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_sources_job_history_clear')(
context, {})

# verify
assert sorted(result, key=lambda item: item['id']) == sorted(
[{'id': source_1.id}, {'id': source_2.id}], key=lambda item: item['id'])
assert harvest_model.HarvestSource.get(source_1.id)
assert harvest_model.HarvestJob.get(job_1.id) is None
assert harvest_model.HarvestObject.get(object_1_.id) is None
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1
assert dataset_from_db_1.id == dataset_1['id']
assert harvest_model.HarvestSource.get(source_2.id)
assert harvest_model.HarvestJob.get(job_2.id) is None
assert harvest_model.HarvestObject.get(object_2_.id) is None
dataset_from_db_2 = model.Package.get(dataset_2['id'])
assert dataset_from_db_2
assert dataset_from_db_2.id == dataset_2['id']

def test_harvest_sources_job_history_clear_keep_current(self):
# prepare
data_dict = SOURCE_DICT.copy()
source_1 = factories.HarvestSourceObj(**data_dict)
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source_2 = factories.HarvestSourceObj(**data_dict)

job_1 = factories.HarvestJobObj(source=source_1)
dataset_1 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
Expand All @@ -353,15 +314,15 @@ def test_harvest_sources_job_history_clear_keep_current(self):
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_sources_job_history_clear')(
context, {'keep_current': True})
context, {})

# verify
assert sorted(result, key=lambda item: item['id']) == sorted(
[{'id': source_1.id}, {'id': source_2.id}], key=lambda item: item['id'])

# dataset, related source, object and job still persist!
# dataset, related source, object still persist, job is deleted!
assert harvest_model.HarvestSource.get(source_1.id)
assert harvest_model.HarvestJob.get(job_1.id)
assert not harvest_model.HarvestJob.get(job_1.id)
assert harvest_model.HarvestObject.get(object_1_.id)
dataset_from_db_1 = model.Package.get(dataset_1['id'])
assert dataset_from_db_1
Expand All @@ -372,50 +333,7 @@ def test_harvest_sources_job_history_clear_keep_current(self):
assert not harvest_model.HarvestJob.get(job_2.id)
assert not harvest_model.HarvestObject.get(object_2_.id)

def test_harvest_source_job_history_clear_keep_current(self):
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])

data_dict = SOURCE_DICT.copy()
data_dict['name'] = 'another-source'
data_dict['url'] = 'http://another-url'
source2 = factories.HarvestSourceObj(**data_dict)
job2 = factories.HarvestJobObj(source=source2)
dataset2 = ckan_factories.Dataset()
object_2_ = factories.HarvestObjectObj(job=job2, source=source2,
package_id=dataset2['id'])

setattr(object_, 'report_status', 'added')
setattr(object_, 'current', True)
model.Session.commit()

# execute
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_source_job_history_clear')(
context, {'id': source.id, 'keep_current': True})

# verify
assert result == {'id': source.id}
assert harvest_model.HarvestSource.get(source.id)
assert harvest_model.HarvestJob.get(job.id)
assert harvest_model.HarvestObject.get(object_.id)
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db
assert dataset_from_db.id == dataset['id']
# source2 and related objects are untouched
assert harvest_model.HarvestSource.get(source2.id)
assert harvest_model.HarvestJob.get(job2.id)
assert harvest_model.HarvestObject.get(object_2_.id)
dataset_from_db_2 = model.Package.get(dataset2['id'])
assert dataset_from_db_2
assert dataset_from_db_2.id == dataset2['id']

def test_harvest_source_job_history_clear_keep_current_finished_jobs(self):
def test_harvest_source_job_history_clear_deletes_current_finished_jobs(self):
# prepare
source = factories.HarvestSourceObj(**SOURCE_DICT.copy())
job = factories.HarvestJobObj(source=source)
Expand All @@ -439,7 +357,7 @@ def test_harvest_source_job_history_clear_keep_current_finished_jobs(self):
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_source_job_history_clear')(
context, {'id': source.id, 'keep_current': True})
context, {'id': source.id})

# verify
assert result == {'id': source.id}
Expand All @@ -449,8 +367,9 @@ def test_harvest_source_job_history_clear_keep_current_finished_jobs(self):
dataset_from_db = model.Package.get(dataset['id'])
assert dataset_from_db
assert dataset_from_db.id == dataset['id']
# job2 and related objects are untouched
assert harvest_model.HarvestJob.get(job2.id)

# job2 is deleted, but harvest objects are kept
assert not harvest_model.HarvestJob.get(job2.id)
assert harvest_model.HarvestObject.get(object_2_.id)
dataset_from_db_2 = model.Package.get(dataset2['id'])
assert dataset_from_db_2
Expand Down Expand Up @@ -488,17 +407,19 @@ def test_harvest_source_job_history_clear_keep_current_running_job(self):
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = get_action('harvest_source_job_history_clear')(
context, {'id': source.id, 'keep_current': True})
context, {'id': source.id})

# verify that both jobs still exists
# verify first job and non-current objects are deleted, but any current objects are kept
assert result == {'id': source.id}
assert harvest_model.HarvestSource.get(source.id)
assert harvest_model.HarvestJob.get(job1.id)
assert harvest_model.HarvestObject.get(object_1_.id)
assert not harvest_model.HarvestJob.get(job1.id)
assert not harvest_model.HarvestObject.get(object_1_.id)
assert harvest_model.HarvestObject.get(object_2_.id)
dataset_from_db = model.Package.get(dataset1['id'])
assert dataset_from_db
assert dataset_from_db.id == dataset1['id']

# verify that second job still exists and all harvest objects are kept
assert harvest_model.HarvestJob.get(job2.id)
assert harvest_model.HarvestObject.get(object_3_.id)
assert harvest_model.HarvestObject.get(object_4_.id)
Expand Down
14 changes: 4 additions & 10 deletions ckanext/harvest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def clear_harvest_source(source_id_or_name):
tk.get_action("harvest_source_clear")(context, {"id": source["id"]})


def clear_harvest_source_history(source_id, keep_current):
def clear_harvest_source_history(source_id):

context = {
"model": model,
Expand All @@ -216,18 +216,12 @@ def clear_harvest_source_history(source_id, keep_current):
if source_id is not None:
tk.get_action("harvest_source_job_history_clear")(context, {
"id": source_id,
"keep_current": keep_current
})
})
return "Cleared job history of harvest source: {0}".format(source_id)
Copy link
Contributor Author

@bonnland bonnland May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the language used in the return statement. This command is most useful if it clears the "job history", not the entire source history. Perhaps a change in the command name to "clear-job-history" would be better, as it more clearly states the eventual outcome of the command.

else:
# Purge queues, because we clean all harvest jobs and
# objects in the database.
if not keep_current:
purge_queues()
# If source is not given, apply to all sources
cleared_sources_dicts = tk.get_action(
"harvest_sources_job_history_clear")(context, {
"keep_current": keep_current
})
"harvest_sources_job_history_clear")(context)
return "Cleared job history for all harvest sources: {0} source(s)".format(
len(cleared_sources_dicts))

Expand Down