diff --git a/dashboard/Dockerfile b/dashboard/Dockerfile
index 01c84ea625aa3..6af19c4947b41 100644
--- a/dashboard/Dockerfile
+++ b/dashboard/Dockerfile
@@ -25,7 +25,7 @@ RUN apt-get update
RUN apt-get -y install postgresql python sudo nginx supervisor
# Python dependencies
-RUN pip install uwsgi 'Django<2.0' psycopg2 pytz requests
+RUN pip install uwsgi 'Django<2.0' psycopg2 pytz requests hexdump
# Postgres configuration
COPY conf/postgresql.conf /etc/postgresql/9.6/main/
diff --git a/dashboard/django/stats/templates/stats/peek.html b/dashboard/django/stats/templates/stats/peek.html
index ebe4821ad3e10..17d66b0c0076e 100644
--- a/dashboard/django/stats/templates/stats/peek.html
+++ b/dashboard/django/stats/templates/stats/peek.html
@@ -19,4 +19,7 @@
-->
-
{{ message_body }}
\ No newline at end of file
+
diff --git a/dashboard/django/stats/views.py b/dashboard/django/stats/views.py
index b49994be79925..d0a7af4917a54 100644
--- a/dashboard/django/stats/views.py
+++ b/dashboard/django/stats/views.py
@@ -18,12 +18,15 @@
#
import logging
+import struct
+
from django.shortcuts import render, get_object_or_404, redirect
from django.template import loader
from django.urls import reverse
from django.views import generic
from django.db.models import Q, IntegerField
from dashboard import settings
+import hexdump
import requests, json, re
from django.http import HttpResponseRedirect, HttpResponse
@@ -363,13 +366,51 @@ def messages(request, topic_name, subscription_name):
'subtitle' : subscription_name,
})
+
+def message_skip_meta(message_view):
+ if not message_view or len(message_view) < 4:
+ raise ValueError("invalid message")
+ meta_size = struct.unpack(">I", message_view[:4])
+ message_index = 4 + meta_size[0]
+ if len(message_view) < message_index:
+ raise ValueError("invalid message")
+ return message_view[message_index:]
+
+
+def get_message_from_http_response(response):
+ if response.status_code != 200:
+ return "ERROR", "status_code=%d" % response.status_code
+ message_view = memoryview(response.content)
+ if 'X-Pulsar-num-batch-message' in response.headers:
+ batch_size = int(response.headers['X-Pulsar-num-batch-message'])
+ if batch_size == 1:
+ message_view = message_skip_meta(message_view)
+ else:
+ # TODO: can not figure out multi-message batch for now
+ return "Batch(size=%d)" % batch_size, ""
+
+ try:
+ text = str(message_view,
+ encoding=response.encoding or response.apparent_encoding,
+ errors='replace')
+ if not text.isprintable():
+ return "Hex", hexdump.hexdump(message_view, result='return')
+ except (LookupError, TypeError):
+ return "Hex", hexdump.hexdump(message_view, result='return')
+ try:
+ return "JSON", json.dumps(json.loads(text),
+ ensure_ascii=False, indent=4)
+ except json.JSONDecodeError:
+ return "Text", text
+
+
def peek(request, topic_name, subscription_name, message_number):
url = settings.SERVICE_URL + '/admin/v2/' + topic_name + '/subscription/' + subscription_name + '/position/' + message_number
response = requests.get(url)
- message = response.text
- message = message[message.index('{'):]
+ message_type, message = get_message_from_http_response(response)
context = {
- 'message_body' : json.dumps(json.loads(message), indent=4),
+ 'message_type': message_type,
+ 'message_body': message,
}
return render(request, 'stats/peek.html', context)