Skip to content

Commit

Permalink
Move websocket performance client and fix bugs (apache#430)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuki Shiga authored and merlimat committed May 26, 2017
1 parent 6f08d3b commit b47df82
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 15 deletions.
3 changes: 3 additions & 0 deletions bin/pulsar-perf
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ where command is one of:
monitor-brokers Continuously receive broker data and/or load reports
simulation-client Run a simulation server acting as a Pulsar client
simulation-controller Run a simulation controller to give commands to servers
websocket-producer Run a websocket producer
help This help message
Expand Down Expand Up @@ -150,6 +151,8 @@ elif [ "$COMMAND" == "simulation-client" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.LoadSimulationClient "$@"
elif [ "$COMMAND" == "simulation-controller" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.LoadSimulationController "$@"
elif [ "$COMMAND" == "websocket-producer" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.proxy.socket.client.PerformanceClient "$@"
elif [ "$COMMAND" == "help" ]; then
pulsar_help;
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,17 @@ static class Arguments {
@Parameter(names = { "--conf-file" }, description = "Configuration file")
public String confFile;

@Parameter(names = { "-u", "--proxy-url" }, description = "Pulsar Proxy URL", required = true)
@Parameter(names = { "-u", "--proxy-url" }, description = "Pulsar Proxy URL, e.g., \"ws://localhost:8080/\"", required = true)
public String proxyURL;

@Parameter(description = "/persistent/my-property/cluster1/my-ns/my-topic", required = true)
public String destination;
public List<String> destinations;

@Parameter(names = { "-r", "--rate" }, description = "Publish rate msg/s across topics")
public int msgRate = 100;

@Parameter(names = { "-s", "--size" }, description = "Message size in byte")
public int msgSize = 1;
public int msgSize = 1024;

@Parameter(names = { "-t", "--num-topic" }, description = "Number of topics")
public int numTopics = 1;
Expand All @@ -99,7 +99,7 @@ static class Arguments {
public Arguments loadArguments(String[] args) {
Arguments arguments = new Arguments();
jc = new JCommander(arguments);
jc.setProgramName("pulsar-websocket-perf-producer");
jc.setProgramName("pulsar-perf-websocket-producer");

try {
jc.parse(args);
Expand All @@ -114,6 +114,12 @@ public Arguments loadArguments(String[] args) {
System.exit(-1);
}

if (arguments.destinations.size() != 1) {
System.err.println("Only one topic name is allowed");
jc.usage();
System.exit(-1);
}

if (arguments.confFile != null) {
Properties prop = new Properties(System.getProperties());

Expand Down Expand Up @@ -148,9 +154,9 @@ public void runPerformanceTest(long messages, long limit, int numOfTopic, int si
String destination) throws InterruptedException, FileNotFoundException {
ExecutorService executor = Executors.newCachedThreadPool(new DefaultThreadFactory("pulsar-perf-producer-exec"));
HashMap<String, Tuple> producersMap = new HashMap<>();
String produceBaseEndPoint = baseUrl + destination;
String produceBaseEndPoint = baseUrl + "ws/producer" + destination;
for (int i = 0; i < numOfTopic; i++) {
String topic = produceBaseEndPoint + "1" + "/";
String topic = numOfTopic > 1 ? produceBaseEndPoint + String.valueOf(i) : produceBaseEndPoint;
URI produceUri = URI.create(topic);

WebSocketClient produceClient = new WebSocketClient(new SslContextFactory(true));
Expand Down Expand Up @@ -183,7 +189,7 @@ public void runPerformanceTest(long messages, long limit, int numOfTopic, int si
while (true) {
for (String topic : producersMap.keySet()) {
if (messages > 0) {
if (totalSent++ >= messages) {
if (totalSent >= messages) {
log.trace("------------------- DONE -----------------------");
Thread.sleep(10000);
System.exit(0);
Expand All @@ -196,9 +202,9 @@ public void runPerformanceTest(long messages, long limit, int numOfTopic, int si
Thread.sleep(10000);
System.exit(0);
}
producersMap.get(topic).getSocket().sendMsg((String) String.valueOf(totalSent), sizeOfMessage);
producersMap.get(topic).getSocket().sendMsg(String.valueOf(totalSent++), sizeOfMessage);
messagesSent.increment();
bytesSent.add(1000);
bytesSent.add(sizeOfMessage);
}
}

Expand All @@ -214,7 +220,7 @@ public void runPerformanceTest(long messages, long limit, int numOfTopic, int si
Histogram reportHistogram = null;

String statsFileName = "perf-websocket-producer-" + System.currentTimeMillis() + ".hgrm";
log.info("Dumping latency stats to %s \n", statsFileName);
log.info("Dumping latency stats to {} \n", statsFileName);

PrintStream histogramLog = new PrintStream(new FileOutputStream(statsFileName), false);
HistogramLogWriter histogramLogWriter = new HistogramLogWriter(histogramLog);
Expand Down Expand Up @@ -264,7 +270,7 @@ public static void main(String[] args) throws Exception {
PerformanceClient test = new PerformanceClient();
Arguments arguments = test.loadArguments(args);
test.runPerformanceTest(arguments.numMessages, arguments.msgRate, arguments.numTopics, arguments.msgSize,
arguments.proxyURL, arguments.destination);
arguments.proxyURL, arguments.destinations.get(0));
}

private class Tuple {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ public void onMessage(String msg) throws JsonParseException {
JsonObject json = new Gson().fromJson(msg, JsonObject.class);
long endTimeNs = System.nanoTime();
long startTime = endTimeNs;
if (startTimeMap.get(json.get(CONTEXT)) != null)
startTime = startTimeMap.get(json.get(CONTEXT));
if (startTimeMap.get(json.get(CONTEXT).getAsString()) != null) {
startTime = startTimeMap.get(json.get(CONTEXT).getAsString());
}
long latencyNs = endTimeNs - startTime;
recorder.recordValue(NANOSECONDS.toMicros(latencyNs));
}
Expand All @@ -92,8 +93,7 @@ public void sendMsg(String context, int sizeOfMessage)
throws IOException, JsonParseException, InterruptedException, ExecutionException {
byte[] payload = new byte[sizeOfMessage];
String message = getEncoder().encodeToString(payload);
String timeStamp = "{\"content\": \"" + message + "\",\"context\": \"" + context
+ "\", \"pulsar-properties\" : {\"test\" :[\"test\"]}}";
String timeStamp = "{\"payload\": \"" + message + "\",\"context\": \"" + context + "\"}";
String sampleMsg = new Gson().fromJson(timeStamp, JsonObject.class).toString();
if (this.session != null && this.session.isOpen() && this.session.getRemote() != null) {
startTimeMap.put(context, System.nanoTime());
Expand Down

0 comments on commit b47df82

Please sign in to comment.