Skip to content

Commit

Permalink
Merge branch '3.0-dev' into temp/20161122135630
Browse files Browse the repository at this point in the history
  • Loading branch information
daviddavis authored Dec 1, 2016
2 parents 09b2a6b + 624635f commit 264178e
Show file tree
Hide file tree
Showing 49 changed files with 289 additions and 253 deletions.
27 changes: 27 additions & 0 deletions app/pulp/app/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,31 @@ class ReservedResource(Model):
worker = models.ForeignKey("Worker", on_delete=models.CASCADE, related_name="reservations")


class WorkerManager(models.Manager):

def get_unreserved_worker(self):
"""
Randomly selects an unreserved :class:`~pulp.app.models.Worker`
Return the Worker instance that has no :class:`~pulp.app.models.ReservedResource`
associated with it. If all workers have ReservedResource relationships, a
:class:`pulp.app.models.Worker.DoesNotExist` exception is raised.
This method also provides randomization for worker selection.
:raises Worker.DoesNotExist: If all workers have ReservedResource entries associated with
them.
:returns: A randomly-selected Worker instance that has no ReservedResource
entries associated with it.
:rtype: pulp.app.models.Worker
"""
free_workers_qs = self.annotate(models.Count('reservations')).filter(reservations__count=0)
if free_workers_qs.count() == 0:
raise self.model.DoesNotExist()
return free_workers_qs.order_by('?').first()


class Worker(Model):
"""
Represents a worker
Expand All @@ -44,6 +69,8 @@ class Worker(Model):
name (models.TextField): The name of the worker, in the format "worker_type@hostname"
last_heartbeat (models.DateTimeField): A timestamp of this worker's last heartbeat
"""
objects = WorkerManager()

name = models.TextField(db_index=True, unique=True)
last_heartbeat = models.DateTimeField(auto_now=True)

Expand Down
5 changes: 3 additions & 2 deletions app/pulp/app/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class OperationPostponedResponse(Response):
This response object should be used by views that dispatch asynchronous tasks. The most common
use case is for sync and publish operations. When JSON is requested, the response will look
like the following:
like the following::
[
{
Expand All @@ -25,7 +25,8 @@ class OperationPostponedResponse(Response):
def __init__(self, task_results):
"""
Args:
task_results (list): List of AsyncResult objects used to generate the response.
task_results (list): List of :class:`celery.result.AsyncResult` objects used to
generate the response.
"""
tasks = []
for result in task_results:
Expand Down
1 change: 1 addition & 0 deletions app/pulp/app/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pulp.app.serializers.fields import (ContentRelatedField, RepositoryRelatedField) # NOQA
from pulp.app.serializers.generic import (ConfigKeyValueRelatedField, # NOQA
NotesKeyValueRelatedField, ScratchpadKeyValueRelatedField) # NOQA
from pulp.app.serializers.catalog import DownloadCatalogSerializer # NOQA
from pulp.app.serializers.consumer import ConsumerSerializer # NOQA
from pulp.app.serializers.content import ContentSerializer, ArtifactSerializer # NOQA
from pulp.app.serializers.progress import ProgressReportSerializer # NOQA
Expand Down
29 changes: 29 additions & 0 deletions app/pulp/app/serializers/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from gettext import gettext as _

from rest_framework import serializers

from pulp.app import models
from pulp.app.serializers import ModelSerializer, DetailRelatedField


class DownloadCatalogSerializer(ModelSerializer):
url = serializers.CharField(
help_text=_("The URL used to download the related artifact."),
allow_blank=True, read_only=True,
)

artifact = serializers.HyperlinkedRelatedField(
help_text=_("The artifact that is expected to be present at url"),
queryset=models.Artifact.objects.all(),
view_name="artifact-details"
)

importer = DetailRelatedField(
help_text=_("The importer that contains the configuration necessary to access url."),
queryset=models.Importer.objects.all(),
view_name="importer-details"
)

class Meta:
model = models.DownloadCatalog
fields = ModelSerializer.Meta.fields + ("artifact", "importer", "url",)
1 change: 1 addition & 0 deletions app/pulp/app/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from pulp.app.tasks import importer, publisher # noqa
17 changes: 17 additions & 0 deletions app/pulp/app/tasks/importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from celery import shared_task

from pulp.app import models
from pulp.tasking.tasks import UserFacingTask


@shared_task(base=UserFacingTask)
def delete(repo_name, importer_name):
"""
Delete an :class:`~pulp.app.models.Importer`
:param repo_name: the name of a repository
:type repo_name: str
:param importer_name: the name of an importer
:type importer_name: str
"""
models.Importer.objects.filter(name=importer_name, repository__name=repo_name).delete()
17 changes: 17 additions & 0 deletions app/pulp/app/tasks/publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from celery import shared_task

from pulp.app import models
from pulp.tasking.tasks import UserFacingTask


@shared_task(base=UserFacingTask)
def delete(repo_name, publisher_name):
"""
Delete a :class:`~pulp.app.models.Publisher`
:param repo_name: the name of a repository
:type repo_name: str
:param publisher_name: the name of a publisher
:type publisher_name: str
"""
models.Publisher.objects.filter(name=publisher_name, repository__name=repo_name).delete()
2 changes: 1 addition & 1 deletion app/pulp/app/viewsets/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pulp.app.serializers import TaskSerializer, WorkerSerializer
from pulp.app.viewsets import NamedModelViewSet
from pulp.app.viewsets.custom_filters import CharInFilter
from pulp.tasking.base import cancel as cancel_task
from pulp.tasking.util import cancel as cancel_task

from rest_framework.decorators import detail_route
from rest_framework.response import Response
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
# Show class inheritance, and group class members together by type (attr, method, etc)
autodoc_default_flags = ['members', 'undoc-members', 'show-inheritance']
autodoc_member_order = 'groupwise'

autoclass_content = 'both'

# -- Options for HTML output ---------------------------------------------------

Expand Down
1 change: 1 addition & 0 deletions docs/contributing/platform_api/app/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ pulp.app

fields
models
response
serializers
viewsets
6 changes: 6 additions & 0 deletions docs/contributing/platform_api/app/response.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pulp.app.response
=================

All response objects documented here should be imported directly from the ``pulp.app.response`` namespace.

.. automodule:: pulp.app.response
3 changes: 3 additions & 0 deletions docs/contributing/platform_api/app/viewsets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ pulp.app.viewsets

All viewsets documented here should be imported directly from the ``pulp.app.viewsets`` namespace.

Viewsets that dispatch tasks that are descendants of :class:`~pulp.tasking.tasks.UserFacingTask`
should return an :class:`~pulp.app.response.OperationPostponedResponse`.

.. automodule:: pulp.app.viewsets

pulp.app.viewsets.base
Expand Down
45 changes: 24 additions & 21 deletions docs/contributing/platform_api/tasking.rst
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@

pulp.tasking
===============
============

.. automodule:: pulp.tasking

pulp.tasking.base
-----------------

.. automodule:: pulp.tasking.base

pulp.tasking.celery_app
-----------------------

Expand All @@ -24,27 +19,35 @@ pulp.tasking.constants

.. automodule:: pulp.tasking.constants

pulp.tasking.manage_workers
---------------------------
pulp.tasking.services.manage_workers
------------------------------------

.. automodule:: pulp.tasking.manage_workers
.. automodule:: pulp.tasking.services.manage_workers

pulp.tasking.registry
---------------------
pulp.tasking.services.scheduler
-------------------------------

.. automodule:: pulp.tasking.registry
.. automodule:: pulp.tasking.services.scheduler

pulp.tasking.scheduler
----------------------
pulp.tasking.services.storage
-----------------------------

.. automodule:: pulp.tasking.services.storage

.. automodule:: pulp.tasking.scheduler
pulp.tasking.services.worker_watcher
------------------------------------

pulp.tasking.storage
--------------------
.. automodule:: pulp.tasking.services.worker_watcher

pulp.tasking.tasks
------------------

.. automodule:: pulp.tasking.tasks


pulp.tasking.util
-----------------

.. automodule:: pulp.tasking.storage
.. automodule:: pulp.tasking.util

pulp.tasking.worker_watcher
---------------------------

.. automodule:: pulp.tasking.worker_watcher
2 changes: 1 addition & 1 deletion plugin/pulp/plugin/tasking.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from pulp.app import models
from pulp.exceptions import exception_to_dict
from pulp.tasking import get_current_task_id
from pulp.tasking.util import get_current_task_id


class Task(object):
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/plugins/conduits/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pulp.server import exceptions as pulp_exceptions
import pulp.plugins.conduits._common as common_utils
import pulp.server.managers.factory as manager_factory
from pulp.tasking import get_current_task_id
from pulp.tasking.util import get_current_task_id


_logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/controllers/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pulp.plugins.util.publish_step import Step
from pulp.server.content.sources.container import ContentContainer
from pulp.server.exceptions import PulpCodedTaskException
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask

_logger = getLogger(__name__)

Expand Down
43 changes: 1 addition & 42 deletions server/pulp/server/controllers/distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pulp.server import exceptions
from pulp.server.db import model
from pulp.server.managers import factory as managers
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -110,47 +110,6 @@ def queue_delete(distributor):
return async_result


@celery.task(base=UserFacingTask, name='pulp.server.tasks.repository.distributor_delete')
def delete(repo_id, dist_id):
"""
Removes a distributor from a repository and unbinds any bound consumers.
:param distributor: distributor to be deleted
:type distributor: pulp.server.db.model.Distributor
"""

distributor = model.Distributor.objects.get_or_404(repo_id=repo_id, distributor_id=dist_id)
managers.repo_publish_schedule_manager().delete_by_distributor_id(repo_id, dist_id)

# Call the distributor's cleanup method
dist_instance, plugin_config = plugin_api.get_distributor_by_id(distributor.distributor_type_id)

call_config = PluginCallConfiguration(plugin_config, distributor.config)
repo = model.Repository.objects.get_repo_or_missing_resource(repo_id)
dist_instance.distributor_removed(repo.to_transfer_repo(), call_config)
distributor.delete()

unbind_errors = []
additional_tasks = []
options = {}

bind_manager = managers.consumer_bind_manager()
for bind in bind_manager.find_by_distributor(repo_id, dist_id):
try:
report = bind_manager.unbind(bind['consumer_id'], bind['repo_id'],
bind['distributor_id'], options)
if report:
additional_tasks.extend(report.spawned_tasks)
except Exception, e:
unbind_errors.append(e)

if unbind_errors:
bind_error = exceptions.PulpCodedException(PLP0003, repo_id=repo_id,
distributor_id=dist_id)
bind_error.child_exceptions = unbind_errors
raise bind_error


def queue_update(distributor, config, delta):
"""
Dispatch a task to update a distributor.
Expand Down
25 changes: 1 addition & 24 deletions server/pulp/server/controllers/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pulp.server import exceptions
from pulp.server.db import model
from pulp.server.managers import factory as manager_factory
from pulp.tasking import UserFacingTask
from pulp.tasking.tasks import UserFacingTask


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -170,29 +170,6 @@ def validate_importer_config(repo_obj, importer_type_id, config):
raise exceptions.PulpDataException(message)


@celery.task(base=UserFacingTask, name='pulp.server.managers.repo.importer.remove_importer')
def remove_importer(repo_id):
"""
Removes an importer from a repository.
:param repo_id: identifies the repo
:type repo_id: str
"""
repo_obj = model.Repository.objects.get_repo_or_missing_resource(repo_id)
repo_importer = model.Importer.objects.get_or_404(repo_id=repo_id)

# remove schedules
sync_manager = manager_factory.repo_sync_schedule_manager()
sync_manager.delete_by_importer_id(repo_id, repo_importer.importer_type_id)

# Call the importer's cleanup method
importer_instance, plugin_config = plugin_api.get_importer_by_id(repo_importer.importer_type_id)

call_config = PluginCallConfiguration(plugin_config, repo_importer.config)
transfer_repo = repo_obj.to_transfer_repo()
importer_instance.importer_removed(transfer_repo, call_config)
repo_importer.delete()


def queue_remove_importer(repo_id, importer_type_id):
"""
Expand Down
3 changes: 2 additions & 1 deletion server/pulp/server/controllers/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
from pulp.server.lazy import URL, Key
from pulp.server.managers import factory as manager_factory
from pulp.server.util import InvalidChecksumType
from pulp.tasking import PulpTask, UserFacingTask, get_current_task_id
from pulp.tasking.tasks import PulpTask, UserFacingTask
from pulp.tasking.util import get_current_task_id
from pulp.tasking.storage import get_working_directory

_logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/db/reaper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pulp.server import config as pulp_config
from pulp.server.db import model
from pulp.server.db.model import celery_result, consumer, repo_group, repository
from pulp.tasking import PulpTask, UserFacingTask
from pulp.tasking.tasks import PulpTask, UserFacingTask


# Add collections to reap here. The keys in this datastructure are the Model classes that represent
Expand Down
2 changes: 1 addition & 1 deletion server/pulp/server/maintenance/monthly.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pulp.common.tags import action_tag
from pulp.server.managers.consumer.applicability import RepoProfileApplicabilityManager

from pulp.tasking import PulpTask, UserFacingTask
from pulp.tasking.tasks import PulpTask, UserFacingTask


@task(base=PulpTask)
Expand Down
Loading

0 comments on commit 264178e

Please sign in to comment.