Skip to content

Commit

Permalink
Merge pull request pulp#2920 from dkliban/2496-worker-queryset
Browse files Browse the repository at this point in the history
Problem: Stale worker documents present in the db
  • Loading branch information
dkliban authored Jan 24, 2017
2 parents 2f2635b + d92a1de commit 6782dfe
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 9 deletions.
2 changes: 1 addition & 1 deletion server/pulp/server/async/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def _get_unreserved_worker():
"""

# Build a mapping of queue names to Worker objects
workers_dict = dict((worker['name'], worker) for worker in Worker.objects())
workers_dict = dict((worker['name'], worker) for worker in Worker.objects.get_online())
worker_names = workers_dict.keys()
reserved_names = [r['worker_name'] for r in ReservedResource.objects.all()]

Expand Down
5 changes: 3 additions & 2 deletions server/pulp/server/db/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from pulp.server.db.fields import ISO8601StringField, UTCDateTimeField
from pulp.server.db.model.reaper_base import ReaperMixin
from pulp.server.db.model import base
from pulp.server.db.querysets import CriteriaQuerySet, RepoQuerySet, RepositoryContentUnitQuerySet
from pulp.server.db.querysets import (CriteriaQuerySet, RepoQuerySet, RepositoryContentUnitQuerySet,
WorkerQuerySet)
from pulp.server.managers import factory
from pulp.server.util import Singleton
from pulp.server.webservices.views import serializers
Expand Down Expand Up @@ -411,7 +412,7 @@ class Worker(AutoRetryDocument):
meta = {'collection': 'workers',
'indexes': [], # this is a small collection that does not need an index
'allow_inheritance': False,
'queryset_class': CriteriaQuerySet}
'queryset_class': WorkerQuerySet}

@property
def queue_name(self):
Expand Down
24 changes: 24 additions & 0 deletions server/pulp/server/db/querysets.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from datetime import datetime, timedelta
from gettext import gettext as _
import operator

from mongoengine import Q
from mongoengine.queryset import DoesNotExist, QuerySetNoCache
from pymongo import ASCENDING

from pulp.common import constants
from pulp.common.dateutils import ensure_tz
from pulp.server import exceptions as pulp_exceptions


Expand Down Expand Up @@ -116,6 +119,27 @@ def get_or_404(self, **kwargs):
raise pulp_exceptions.MissingResource(**kwargs)


class WorkerQuerySet(CriteriaQuerySet):
"""
Custom queryset for workers
"""

def get_online(self):
"""
Returns a queryset with a subset of Worker documents.
The queryset is filtered to remove any Worker document that has not been updated in the
last 25 seconds.
:return: mongoengine queryset object
:rtype: mongoengine.queryset.QuerySet
"""
query_set = self
now = ensure_tz(datetime.utcnow())
oldest_heartbeat_time = now - timedelta(seconds=constants.PULP_PROCESS_TIMEOUT_INTERVAL)
return query_set.filter(last_heartbeat__gte=oldest_heartbeat_time)


class RepoQuerySet(CriteriaQuerySet):
"""
Custom queryset for repositories.
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/managers/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_workers():
:returns: list of workers with their heartbeats
:rtype: list
"""
return Worker.objects()
return Worker.objects.get_online()


def get_mongo_conn_status():
Expand Down
3 changes: 2 additions & 1 deletion server/test/unit/server/async/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,8 @@ def test_reserved_resources_queried_correctly(self, mock_reserved_resource):
@mock.patch('pulp.server.async.tasks.ReservedResource')
def test_worker_returned_when_one_worker_is_not_reserved(self, mock_reserved_resource,
mock_worker_objects):
mock_worker_objects.return_value = [{'name': 'a'}, {'name': 'b'}]
get_online = mock_worker_objects.get_online
get_online.return_value = [{'name': 'a'}, {'name': 'b'}]
mock_reserved_resource.objects.all.return_value = [{'worker_name': 'a'}]
result = tasks._get_unreserved_worker()
self.assertEqual(result, {'name': 'b'})
Expand Down
4 changes: 2 additions & 2 deletions server/test/unit/server/db/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from pulp.server.exceptions import PulpCodedException
from pulp.server.db import model
from pulp.server.db.fields import ISO8601StringField
from pulp.server.db.querysets import CriteriaQuerySet
from pulp.server.db.querysets import CriteriaQuerySet, WorkerQuerySet
from pulp.server.webservices.views import serializers


Expand Down Expand Up @@ -707,7 +707,7 @@ def test_meta_inheritance(self):
self.assertEqual(model.Worker._meta['allow_inheritance'], False)

def test_meta_queryset(self):
self.assertEqual(model.Worker._meta['queryset_class'], CriteriaQuerySet)
self.assertEqual(model.Worker._meta['queryset_class'], WorkerQuerySet)
self.assertTrue(issubclass(model.Worker.objects.__class__, QuerySetNoCache))


Expand Down
5 changes: 3 additions & 2 deletions server/test/unit/server/managers/test_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ def test_get_version(self, mock_get_distribution):

@patch('pulp.server.db.model.Worker.objects')
def test_get_workers(self, mock_worker_objects):
mock_worker_objects.return_value = [{"last_heartbeat": "123456", "name": "some_worker_1"},
{"last_heartbeat": "123456", "name": "some_worker_2"}]
get_online = mock_worker_objects.get_online
get_online.return_value = [{"last_heartbeat": "123456", "name": "some_worker_1"},
{"last_heartbeat": "123456", "name": "some_worker_2"}]

self.assertEquals(status_manager.get_workers(), [{"last_heartbeat": "123456",
"name": "some_worker_1"},
Expand Down

0 comments on commit 6782dfe

Please sign in to comment.