diff --git a/README.rst b/README.rst index aa510732d..e0029779a 100644 --- a/README.rst +++ b/README.rst @@ -253,6 +253,10 @@ The following operations can be run from the command line as described underneat import) without involving the web UI or the queue backends. This is useful for testing a harvester without having to fire up gather/fetch_consumer processes, as is done in production. + + harvester run_test {source-id/name} force-import=guid1,guid2... + - In order to force an import of particular datasets, useful to + target a dataset for dev purposes or when forcing imports on other environments. harvester gather_consumer - starts the consumer for the gathering queue diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 906c60c9a..0fb8b3ac2 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -434,7 +434,7 @@ def run_test_harvest(self): # Determine the job try: job_dict = get_action('harvest_job_create')( - context, {'source_id': source['id']}) + context, {'source_id': source['id'], 'run': False}) except HarvestJobExists: running_jobs = get_action('harvest_job_list')( context, {'source_id': source['id'], 'status': 'Running'}) @@ -455,6 +455,9 @@ def run_test_harvest(self): job_dict = jobs[0] job_obj = HarvestJob.get(job_dict['id']) + if len(self.args) >= 3 and self.args[2].startswith('force-import='): + job_obj.force_import = self.args[2].split('=')[-1] + harvester = queue.get_harvester(source['source_type']) assert harvester, \ 'No harvester found for type: {0}'.format(source['source_type']) diff --git a/ckanext/harvest/logic/dictization.py b/ckanext/harvest/logic/dictization.py index ba8792cf0..e5b041be9 100644 --- a/ckanext/harvest/logic/dictization.py +++ b/ckanext/harvest/logic/dictization.py @@ -121,7 +121,6 @@ def _get_source_status(source, context): 'job_count': 0, 'next_harvest': '', 'last_harvest_request': '', - 'overall_statistics': {'added': 0, 'errors': 0}, } if not job_count: @@ -142,28 +141,7 @@ def _get_source_status(source, context): .order_by(HarvestJob.created.desc()).first() if last_job: - # TODO: Should we encode the dates as strings? out['last_harvest_request'] = str(last_job.gather_finished) - - # Overall statistics - packages = model.Session.query(distinct(HarvestObject.package_id), - Package.name) \ - .join(Package).join(HarvestSource) \ - .filter(HarvestObject.source == source) \ - .filter( - HarvestObject.current == True # noqa: E711 - ).filter(Package.state == u'active') - - out['overall_statistics']['added'] = packages.count() - - gather_errors = model.Session.query(HarvestGatherError) \ - .join(HarvestJob).join(HarvestSource) \ - .filter(HarvestJob.source == source).count() - - object_errors = model.Session.query(HarvestObjectError) \ - .join(HarvestObject).join(HarvestJob).join(HarvestSource) \ - .filter(HarvestJob.source == source).count() - out['overall_statistics']['errors'] = gather_errors + object_errors else: out['last_harvest_request'] = 'Not yet harvested' diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index d12f06270..33f735c4b 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -236,8 +236,12 @@ def __init__(self, redis, routing_key): def consume(self, queue): while True: key, body = self.redis.blpop(self.routing_key) - self.redis.set(self.persistance_key(body), - str(datetime.datetime.now())) + try: + self.redis.set(self.persistance_key(body), str(datetime.datetime.now())) + except Exception as e: + log.error("Redis Exception: %s", e) + continue + yield (FakeMethod(body), self, body) def persistance_key(self, message): diff --git a/ckanext/harvest/templates/snippets/job_details.html b/ckanext/harvest/templates/snippets/job_details.html index 123a4ecc9..3fed10ccf 100644 --- a/ckanext/harvest/templates/snippets/job_details.html +++ b/ckanext/harvest/templates/snippets/job_details.html @@ -25,7 +25,7 @@ {% endif %} {{ _('errors') }} - {% for action in ['added', 'updated', 'deleted'] %} + {% for action in ['added', 'updated', 'deleted', 'not modified'] %} {% if action in stats and stats[action] > 0 %} {{ stats[action] }} diff --git a/ckanext/harvest/templates/source/job/list.html b/ckanext/harvest/templates/source/job/list.html index f7ef1583f..aeeddda55 100644 --- a/ckanext/harvest/templates/source/job/list.html +++ b/ckanext/harvest/templates/source/job/list.html @@ -45,7 +45,7 @@

{% endif %} - {% for action in ['added', 'updated', 'deleted'] %} + {% for action in ['added', 'updated', 'deleted', 'not modified'] %}
  • {% if action in job.stats and job.stats[action] > 0 %} @@ -66,4 +66,3 @@

    {% endblock %} - diff --git a/ckanext/harvest/tests/lib.py b/ckanext/harvest/tests/lib.py index e40eb8feb..fe021e819 100644 --- a/ckanext/harvest/tests/lib.py +++ b/ckanext/harvest/tests/lib.py @@ -1,8 +1,12 @@ +import logging + from ckanext.harvest.tests.factories import HarvestSourceObj, HarvestJobObj import ckanext.harvest.model as harvest_model from ckanext.harvest import queue from ckan.plugins import toolkit +log = logging.getLogger(__name__) + def run_harvest(url, harvester, config=''): '''Runs a harvest and returns the results. @@ -41,6 +45,15 @@ def run_harvest_job(job, harvester): for obj_id in obj_ids: harvest_object = harvest_model.HarvestObject.get(obj_id) guid = harvest_object.guid + + # force reimport of datasets + if hasattr(job, 'force_import'): + if guid in job.force_import: + harvest_object.force_import = True + else: + log.info('Skipping: %s', guid) + continue + results_by_guid[guid] = {'obj_id': obj_id} queue.fetch_and_import_stages(harvester, harvest_object) diff --git a/ckanext/harvest/tests/test_queue.py b/ckanext/harvest/tests/test_queue.py index 435f6235f..ebc09785e 100644 --- a/ckanext/harvest/tests/test_queue.py +++ b/ckanext/harvest/tests/test_queue.py @@ -1,3 +1,5 @@ +from mock import patch + from ckantoolkit.tests.helpers import reset_db import ckanext.harvest.model as harvest_model from ckanext.harvest.model import HarvestObject, HarvestObjectExtra @@ -298,3 +300,49 @@ def test_redis_queue_purging(self): assert_equal(redis.llen(queue.get_fetch_routing_key()), 0) finally: redis.delete('ckanext-harvest:some-random-key') + + +class TestHarvestCorruptRedis(object): + @classmethod + def setup_class(cls): + reset_db() + harvest_model.setup() + + @patch('ckanext.harvest.queue.log.error') + def test_redis_corrupt(self, mock_log_error): + ''' + Test that corrupt Redis doesn't stop harvest process and still processes other jobs. + ''' + if config.get('ckan.harvest.mq.type') != 'redis': + raise SkipTest() + redis = queue.get_connection() + try: + redis.set('ckanext-harvest:some-random-key-2', 'foobar') + + # make sure queues/exchanges are created first and are empty + gather_consumer = queue.get_gather_consumer() + fetch_consumer = queue.get_fetch_consumer() + gather_consumer.queue_purge(queue=queue.get_gather_queue_name()) + fetch_consumer.queue_purge(queue=queue.get_fetch_queue_name()) + + # Create some fake jobs and objects with no harvest_job_id + gather_publisher = queue.get_gather_publisher() + gather_publisher.send({'harvest_job_id': str(uuid.uuid4())}) + fetch_publisher = queue.get_fetch_publisher() + fetch_publisher.send({'harvest_object_id': None}) + h_obj_id = str(uuid.uuid4()) + fetch_publisher.send({'harvest_object_id': h_obj_id}) + + # Create some fake objects + next(gather_consumer.consume(queue.get_gather_queue_name())) + _, _, body = next(fetch_consumer.consume(queue.get_fetch_queue_name())) + + json_obj = json.loads(body) + assert json_obj['harvest_object_id'] == h_obj_id + + assert mock_log_error.call_count == 1 + args, _ = mock_log_error.call_args_list[0] + assert "cannot concatenate 'str' and 'NoneType' objects" in args[1] + + finally: + redis.delete('ckanext-harvest:some-random-key-2')