Skip to content

Commit

Permalink
Decouple RequestEventsJsonStream from servlet
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Jacobs committed Jan 26, 2016
1 parent 523d997 commit 1a30360
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,9 @@
*/
package com.netflix.hystrix.contrib.requests.stream;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixInvokableInfo;
import com.netflix.hystrix.metric.HystrixRequestEvents;
import com.netflix.hystrix.metric.HystrixRequestEventsStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Subscriber;
Expand All @@ -35,9 +30,7 @@
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -62,7 +55,11 @@ public class HystrixRequestEventsSseServlet extends HttpServlet {
DynamicPropertyFactory.getInstance().getIntProperty("hystrix.requests.stream.maxConcurrentConnections", 5);

private final LinkedBlockingQueue<HystrixRequestEvents> requestQueue = new LinkedBlockingQueue<HystrixRequestEvents>(DEFAULT_QUEUE_DEPTH);
private final JsonFactory jsonFactory = new JsonFactory();
private final RequestEventsJsonStream requestEventsJsonStream;

public HystrixRequestEventsSseServlet() {
requestEventsJsonStream = new RequestEventsJsonStream();
}

/**
* Handle incoming GETs
Expand Down Expand Up @@ -113,59 +110,7 @@ public void destroy() {
super.destroy();
}

private String convertToString(Collection<HystrixRequestEvents> requests) throws IOException {
StringWriter jsonString = new StringWriter();
JsonGenerator json = jsonFactory.createGenerator(jsonString);

json.writeStartArray();
for (HystrixRequestEvents request : requests) {
convertRequestToJson(json, request);
}
json.writeEndArray();
json.close();
return jsonString.getBuffer().toString();
}

private void convertRequestToJson(JsonGenerator json, HystrixRequestEvents request) throws IOException {
json.writeStartObject();
json.writeStringField("request", request.getRequestContext().toString());
json.writeObjectFieldStart("commands");
for (HystrixInvokableInfo<?> execution: request.getExecutions()) {
convertExecutionToJson(json, execution);
}
json.writeEndObject();
json.writeEndObject();
}

private void convertExecutionToJson(JsonGenerator json, HystrixInvokableInfo<?> execution) throws IOException {
json.writeObjectFieldStart(execution.getCommandKey().name());
json.writeNumberField("latency", execution.getExecutionTimeInMilliseconds());
json.writeArrayFieldStart("events");
for (HystrixEventType eventType: execution.getExecutionEvents()) {
switch (eventType) {
case EMIT:
json.writeStartObject();
json.writeNumberField(eventType.name(), execution.getNumberEmissions());
json.writeEndObject();
break;
case FALLBACK_EMIT:
json.writeStartObject();
json.writeNumberField(eventType.name(), execution.getNumberFallbackEmissions());
json.writeEndObject();
break;
case COLLAPSED:
json.writeStartObject();
json.writeNumberField(eventType.name(), execution.getNumberCollapsed());
json.writeEndObject();
break;
default:
json.writeString(eventType.name());
break;
}
}
json.writeEndArray();
json.writeEndObject();
}

/**
* - maintain an open connection with the client
Expand Down Expand Up @@ -199,8 +144,8 @@ private void handleRequest(HttpServletRequest request, final HttpServletResponse

//since the sample stream is based on Observable.interval, events will get published on an RxComputation thread
//since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext
requestsSubscription = HystrixRequestEventsStream.getInstance()
.observe()
requestsSubscription = requestEventsJsonStream
.getStream()
.observeOn(Schedulers.io())
.subscribe(new Subscriber<HystrixRequestEvents>() {
@Override
Expand Down Expand Up @@ -238,12 +183,7 @@ public void onNext(HystrixRequestEvents requestEvents) {
} else {
List<HystrixRequestEvents> l = new ArrayList<HystrixRequestEvents>();
requestQueue.drainTo(l);
String requestEventsAsStr = convertToString(l);
//try {
//} catch (IOException ioe) {
// //exception while converting String to JSON
// logger.error("Error converting configuration to JSON ", ioe);
//}
String requestEventsAsStr = RequestEventsJsonStream.convertToJson(l);
if (requestEventsAsStr != null) {
try {
writer.print("data: " + requestEventsAsStr + "\n\n");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Copyright 2016 Netflix, Inc.
* <p/>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix.contrib.requests.stream;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixInvokableInfo;
import com.netflix.hystrix.metric.HystrixRequestEvents;
import com.netflix.hystrix.metric.HystrixRequestEventsStream;
import rx.Observable;

import java.io.IOException;
import java.io.StringWriter;
import java.util.Collection;

public class RequestEventsJsonStream {
private static final JsonFactory jsonFactory = new JsonFactory();

public Observable<HystrixRequestEvents> getStream() {
return HystrixRequestEventsStream.getInstance()
.observe();
}

public static String convertToJson(Collection<HystrixRequestEvents> requests) throws IOException {
StringWriter jsonString = new StringWriter();
JsonGenerator json = jsonFactory.createGenerator(jsonString);

json.writeStartArray();
for (HystrixRequestEvents request : requests) {
convertRequestToJson(json, request);
}
json.writeEndArray();
json.close();
return jsonString.getBuffer().toString();
}

private static void convertRequestToJson(JsonGenerator json, HystrixRequestEvents request) throws IOException {
json.writeStartObject();
json.writeStringField("request", request.getRequestContext().toString());
json.writeObjectFieldStart("commands");
for (HystrixInvokableInfo<?> execution: request.getExecutions()) {
convertExecutionToJson(json, execution);
}
json.writeEndObject();
json.writeEndObject();
}

private static void convertExecutionToJson(JsonGenerator json, HystrixInvokableInfo<?> execution) throws IOException {
json.writeObjectFieldStart(execution.getCommandKey().name());
json.writeNumberField("latency", execution.getExecutionTimeInMilliseconds());
json.writeArrayFieldStart("events");
for (HystrixEventType eventType: execution.getExecutionEvents()) {
switch (eventType) {
case EMIT:
json.writeStartObject();
json.writeNumberField(eventType.name(), execution.getNumberEmissions());
json.writeEndObject();
break;
case FALLBACK_EMIT:
json.writeStartObject();
json.writeNumberField(eventType.name(), execution.getNumberFallbackEmissions());
json.writeEndObject();
break;
case COLLAPSED:
json.writeStartObject();
json.writeNumberField(eventType.name(), execution.getNumberCollapsed());
json.writeEndObject();
break;
default:
json.writeString(eventType.name());
break;
}
}
json.writeEndArray();
json.writeEndObject();
}
}

0 comments on commit 1a30360

Please sign in to comment.