Skip to content

Commit

Permalink
Merge branch 'alphagov-dgu-fixes'
Browse files Browse the repository at this point in the history
  • Loading branch information
amercader committed Jan 22, 2020
2 parents 9a07b26 + a5abb6d commit 713a0fb
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 28 deletions.
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ The following operations can be run from the command line as described underneat
import) without involving the web UI or the queue backends. This is
useful for testing a harvester without having to fire up
gather/fetch_consumer processes, as is done in production.
harvester run_test {source-id/name} force-import=guid1,guid2...
- In order to force an import of particular datasets, useful to
target a dataset for dev purposes or when forcing imports on other environments.

harvester gather_consumer
- starts the consumer for the gathering queue
Expand Down
5 changes: 4 additions & 1 deletion ckanext/harvest/commands/harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def run_test_harvest(self):
# Determine the job
try:
job_dict = get_action('harvest_job_create')(
context, {'source_id': source['id']})
context, {'source_id': source['id'], 'run': False})
except HarvestJobExists:
running_jobs = get_action('harvest_job_list')(
context, {'source_id': source['id'], 'status': 'Running'})
Expand All @@ -455,6 +455,9 @@ def run_test_harvest(self):
job_dict = jobs[0]
job_obj = HarvestJob.get(job_dict['id'])

if len(self.args) >= 3 and self.args[2].startswith('force-import='):
job_obj.force_import = self.args[2].split('=')[-1]

harvester = queue.get_harvester(source['source_type'])
assert harvester, \
'No harvester found for type: {0}'.format(source['source_type'])
Expand Down
22 changes: 0 additions & 22 deletions ckanext/harvest/logic/dictization.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def _get_source_status(source, context):
'job_count': 0,
'next_harvest': '',
'last_harvest_request': '',
'overall_statistics': {'added': 0, 'errors': 0},
}

if not job_count:
Expand All @@ -142,28 +141,7 @@ def _get_source_status(source, context):
.order_by(HarvestJob.created.desc()).first()

if last_job:
# TODO: Should we encode the dates as strings?
out['last_harvest_request'] = str(last_job.gather_finished)

# Overall statistics
packages = model.Session.query(distinct(HarvestObject.package_id),
Package.name) \
.join(Package).join(HarvestSource) \
.filter(HarvestObject.source == source) \
.filter(
HarvestObject.current == True # noqa: E711
).filter(Package.state == u'active')

out['overall_statistics']['added'] = packages.count()

gather_errors = model.Session.query(HarvestGatherError) \
.join(HarvestJob).join(HarvestSource) \
.filter(HarvestJob.source == source).count()

object_errors = model.Session.query(HarvestObjectError) \
.join(HarvestObject).join(HarvestJob).join(HarvestSource) \
.filter(HarvestJob.source == source).count()
out['overall_statistics']['errors'] = gather_errors + object_errors
else:
out['last_harvest_request'] = 'Not yet harvested'

Expand Down
8 changes: 6 additions & 2 deletions ckanext/harvest/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,12 @@ def __init__(self, redis, routing_key):
def consume(self, queue):
while True:
key, body = self.redis.blpop(self.routing_key)
self.redis.set(self.persistance_key(body),
str(datetime.datetime.now()))
try:
self.redis.set(self.persistance_key(body), str(datetime.datetime.now()))
except Exception as e:
log.error("Redis Exception: %s", e)
continue

yield (FakeMethod(body), self, body)

def persistance_key(self, message):
Expand Down
2 changes: 1 addition & 1 deletion ckanext/harvest/templates/snippets/job_details.html
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
{% endif %}
{{ _('errors') }}
</span>
{% for action in ['added', 'updated', 'deleted'] %}
{% for action in ['added', 'updated', 'deleted', 'not modified'] %}
<span class="label" data-diff="{{ action }}">
{% if action in stats and stats[action] > 0 %}
{{ stats[action] }}
Expand Down
3 changes: 1 addition & 2 deletions ckanext/harvest/templates/source/job/list.html
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ <h3 class="dataset-heading">
</span>
</li>
{% endif %}
{% for action in ['added', 'updated', 'deleted'] %}
{% for action in ['added', 'updated', 'deleted', 'not modified'] %}
<li>
<span class="label" data-diff="{{ action }}" title="{{ _(action) }}">
{% if action in job.stats and job.stats[action] > 0 %}
Expand All @@ -66,4 +66,3 @@ <h3 class="dataset-heading">

</div>
{% endblock %}

13 changes: 13 additions & 0 deletions ckanext/harvest/tests/lib.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import logging

from ckanext.harvest.tests.factories import HarvestSourceObj, HarvestJobObj
import ckanext.harvest.model as harvest_model
from ckanext.harvest import queue
from ckan.plugins import toolkit

log = logging.getLogger(__name__)


def run_harvest(url, harvester, config=''):
'''Runs a harvest and returns the results.
Expand Down Expand Up @@ -41,6 +45,15 @@ def run_harvest_job(job, harvester):
for obj_id in obj_ids:
harvest_object = harvest_model.HarvestObject.get(obj_id)
guid = harvest_object.guid

# force reimport of datasets
if hasattr(job, 'force_import'):
if guid in job.force_import:
harvest_object.force_import = True
else:
log.info('Skipping: %s', guid)
continue

results_by_guid[guid] = {'obj_id': obj_id}

queue.fetch_and_import_stages(harvester, harvest_object)
Expand Down
48 changes: 48 additions & 0 deletions ckanext/harvest/tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from mock import patch

from ckantoolkit.tests.helpers import reset_db
import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
Expand Down Expand Up @@ -298,3 +300,49 @@ def test_redis_queue_purging(self):
assert_equal(redis.llen(queue.get_fetch_routing_key()), 0)
finally:
redis.delete('ckanext-harvest:some-random-key')


class TestHarvestCorruptRedis(object):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()

@patch('ckanext.harvest.queue.log.error')
def test_redis_corrupt(self, mock_log_error):
'''
Test that corrupt Redis doesn't stop harvest process and still processes other jobs.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
raise SkipTest()
redis = queue.get_connection()
try:
redis.set('ckanext-harvest:some-random-key-2', 'foobar')

# make sure queues/exchanges are created first and are empty
gather_consumer = queue.get_gather_consumer()
fetch_consumer = queue.get_fetch_consumer()
gather_consumer.queue_purge(queue=queue.get_gather_queue_name())
fetch_consumer.queue_purge(queue=queue.get_fetch_queue_name())

# Create some fake jobs and objects with no harvest_job_id
gather_publisher = queue.get_gather_publisher()
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
fetch_publisher = queue.get_fetch_publisher()
fetch_publisher.send({'harvest_object_id': None})
h_obj_id = str(uuid.uuid4())
fetch_publisher.send({'harvest_object_id': h_obj_id})

# Create some fake objects
next(gather_consumer.consume(queue.get_gather_queue_name()))
_, _, body = next(fetch_consumer.consume(queue.get_fetch_queue_name()))

json_obj = json.loads(body)
assert json_obj['harvest_object_id'] == h_obj_id

assert mock_log_error.call_count == 1
args, _ = mock_log_error.call_args_list[0]
assert "cannot concatenate 'str' and 'NoneType' objects" in args[1]

finally:
redis.delete('ckanext-harvest:some-random-key-2')

0 comments on commit 713a0fb

Please sign in to comment.