Skip to content

Commit

Permalink
[dashboard] fix admin path (apache#5030)
Browse files Browse the repository at this point in the history
access v1/v2 admin api by namespace name pattern
  • Loading branch information
yittg authored and sijie committed Aug 26, 2019
1 parent ff41871 commit 4fa87a4
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 25 deletions.
6 changes: 3 additions & 3 deletions dashboard/django/stats/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
urlpatterns = [
url(r'^property/(?P<property_name>.+)/$', views.property, name='property'),
url(r'^namespace/(?P<namespace_name>.+)/$', views.namespace, name='namespace'),
url(r'^deleteNamespace/(?P<namespace_name>.+)$', views.deleteNamespace, name='deleteNamespace'),
url(r'^deleteNamespace/(?P<namespace_name>.+)$', views.delete_namespace, name='deleteNamespace'),


url(r'^brokers/$', views.brokers, name='brokers'),
Expand All @@ -35,7 +35,7 @@
url(r'^topic/(?P<topic_name>.+)/$', views.topic, name='topic'),

url(r'^clusters/$', views.clusters, name='clusters'),
url(r'^clearSubscription/(?P<topic_name>.+)/(?P<subscription_name>.+)$', views.clearSubscription, name='clearSubscription'),
url(r'^deleteSubscription/(?P<topic_name>.+)/(?P<subscription_name>.+)$', views.deleteSubscription, name='deleteSubscription'),
url(r'^clearSubscription/(?P<topic_name>.+)/(?P<subscription_name>.+)$', views.clear_subscription, name='clearSubscription'),
url(r'^deleteSubscription/(?P<topic_name>.+)/(?P<subscription_name>.+)$', views.delete_subscription, name='deleteSubscription'),
url(r'^messages/(?P<topic_name>.+)/(?P<subscription_name>.+)$', views.messages, name='messages'),
]
102 changes: 80 additions & 22 deletions dashboard/django/stats/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,68 @@
logger = logging.getLogger(__name__)


class AdminPath:
v1 = "/admin"
v2 = "/admin/v2"

@staticmethod
def get(is_v2):
return AdminPath.v2 if is_v2 else AdminPath.v1


class TopicName:
def __init__(self, topic_name):
try:
self.scheme, self.path = topic_name.split("://", 1)
except ValueError:
self.scheme, self.path = topic_name.split("/", 1)
self.namespace_path, self.name = self.path.rsplit("/", 1)
self.namespace = NamespaceName(self.namespace_path)

def is_v2(self):
return self.namespace.is_v2()

def url_name(self):
return "/".join([self.scheme, self.path])

def full_name(self):
return "://".join([self.scheme, self.path])

def short_name(self):
return self.name

def is_global(self):
return self.namespace.is_global()

def admin_path(self):
b = AdminPath.get(self.is_v2())
return settings.SERVICE_URL + b + "/" + self.url_name()


class NamespaceName:

def __init__(self, namespace_name):
self.path = namespace_name.strip("/")
path_parts = self.path.split("/")
if len(path_parts) == 2:
self.tenant, self.namespace = path_parts
self.cluster = None
elif len(path_parts) == 3:
self.tenant, self.cluster, self.namespace = path_parts
else:
raise ValueError("invalid namespace:" + namespace_name)

def is_v2(self):
return self.cluster is None

def is_global(self):
return self.cluster == "global"

def admin_path(self):
b = AdminPath.get(self.is_v2())
return settings.SERVICE_URL + b + "/namespaces/" + self.path


def get_timestamp():
try:
return LatestTimestamp.objects.get(name='latest').timestamp
Expand Down Expand Up @@ -139,18 +201,19 @@ def namespace(request, namespace_name):
})


def deleteNamespace(request, namespace_name):
url = settings.SERVICE_URL + '/admin/v2/namespaces/' + namespace_name
def delete_namespace(request, namespace_name):
url = NamespaceName(namespace_name).admin_path()
response = requests.delete(url)
status = response.status_code
logger.debug("Delete namespace " + namespace_name + " status - " + str(status))
if status == 204:
Namespace.objects.filter(name=namespace_name, timestamp=get_timestamp()).update(deleted=True)
return redirect('property', property_name=namespace_name.split('/', 1)[0])


def topic(request, topic_name):
timestamp = get_timestamp()
topic_name = extract_topic_db_name(topic_name)
topic_name = TopicName(topic_name).full_name()
cluster_name = request.GET.get('cluster')
clusters = []

Expand Down Expand Up @@ -314,12 +377,12 @@ def clusters(request):
})


def clearSubscription(request, topic_name, subscription_name):
url = settings.SERVICE_URL + '/admin/v2/' + topic_name + '/subscription/' + subscription_name + '/skip_all'
def clear_subscription(request, topic_name, subscription_name):
url = "%s/subscription/%s/skip_all" % (TopicName(topic_name).admin_path(), subscription_name)
response = requests.post(url)
if response.status_code == 204:
ts = get_timestamp()
topic_db_name = extract_topic_db_name(topic_name)
topic_db_name = TopicName(topic_name).full_name()
topic = Topic.objects.get(name=topic_db_name, timestamp=ts)
subscription = Subscription.objects.get(name=subscription_name, topic=topic, timestamp=ts)
topic.backlog = topic.backlog - subscription.msgBacklog
Expand All @@ -328,13 +391,14 @@ def clearSubscription(request, topic_name, subscription_name):
subscription.save(update_fields=['msgBacklog'])
return redirect('topic', topic_name=topic_name)

def deleteSubscription(request, topic_name, subscription_name):
url = settings.SERVICE_URL + '/admin/v2/' + topic_name + '/subscription/' + subscription_name

def delete_subscription(request, topic_name, subscription_name):
url = "%s/subscription/%s" % (TopicName(topic_name).admin_path(), subscription_name)
response = requests.delete(url)
status = response.status_code
if status == 204:
ts = get_timestamp()
topic_db_name = extract_topic_db_name(topic_name)
topic_db_name = TopicName(topic_name).full_name()
topic = Topic.objects.get(name=topic_db_name, timestamp=ts)
deleted_subscription = Subscription.objects.get(name=subscription_name, topic=topic, timestamp=ts)
deleted_subscription.deleted = True
Expand All @@ -353,7 +417,8 @@ def deleteSubscription(request, topic_name, subscription_name):


def messages(request, topic_name, subscription_name):
topic_db_name = extract_topic_db_name(topic_name)
topic_name_obj = TopicName(topic_name)
topic_db_name = topic_name_obj.full_name()
timestamp = get_timestamp()
cluster_name = request.GET.get('cluster')

Expand All @@ -371,7 +436,7 @@ def messages(request, topic_name, subscription_name):
message = None
message_position = request.GET.get('message-position')
if message_position and message_position.isnumeric():
message = peek_message(topic_obj, subscription_name, message_position)
message = peek_message(topic_name_obj, subscription_name, message_position)
if not isinstance(message, list):
message = [message]

Expand Down Expand Up @@ -485,19 +550,12 @@ def get_message_from_http_response(response):
response.content)


def peek_message(topic_obj, subscription_name, message_position):
def peek_message(topic_name, subscription_name, message_position):
if not isinstance(topic_name, TopicName):
topic_name = TopicName(topic_name)
peek_url = "%s/subscription/%s/position/%s" % (
topic_path(topic_obj), subscription_name, message_position)
topic_name.admin_path(), subscription_name, message_position)
peek_response = requests.get(peek_url)
if peek_response.status_code != 200:
return {"ERROR": "%s(%d)" % (peek_response.reason, peek_response.status_code)}
return get_message_from_http_response(peek_response)


def topic_path(topic_obj):
admin_base = "/admin/v2/" if topic_obj.is_v2() else "/admin/"
return settings.SERVICE_URL + admin_base + topic_obj.url_name()


def extract_topic_db_name(topic_name):
return 'persistent://' + topic_name.split('persistent/', 1)[1]

0 comments on commit 4fa87a4

Please sign in to comment.