Skip to content

Commit

Permalink
Moved non-core data streams into hystrix-data-streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Jacobs committed Jun 22, 2016
1 parent e712bb7 commit 151a4a0
Show file tree
Hide file tree
Showing 32 changed files with 332 additions and 282 deletions.
2 changes: 1 addition & 1 deletion hystrix-contrib/hystrix-metrics-event-stream/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dependencies {
compile project(':hystrix-core')
compile 'com.fasterxml.jackson.core:jackson-core:2.5.2'
compile project(':hystrix-data-stream')
provided 'javax.servlet:servlet-api:2.5'
testCompile 'junit:junit-dep:4.10'
testCompile 'org.mockito:mockito-all:1.9.5'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@
* the code below may reference a HystrixEventType that does not exist in hystrix-core. If this happens,
* a j.l.NoSuchFieldError occurs. Since this data is not being generated by hystrix-core, it's safe to count it as 0
* and we should log an error to get users to update their dependency set.
*
* @deprecated Prefer {@link com.netflix.hystrix.metric.consumer.HystrixDashboardStream}
*/
@Deprecated //since 1.5.4
public class HystrixMetricsPoller {

static final Logger logger = LoggerFactory.getLogger(HystrixMetricsPoller.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@

import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.hystrix.contrib.sample.stream.HystrixSampleSseServlet;
import com.netflix.hystrix.metric.consumer.HystrixDashboardStream;
import com.netflix.hystrix.metric.serial.SerialHystrixDashboardData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -51,186 +48,47 @@
* </servlet-mapping>
* } </pre>
*/
public class HystrixMetricsStreamServlet extends HttpServlet {
public class HystrixMetricsStreamServlet extends HystrixSampleSseServlet<HystrixDashboardStream.DashboardData> {

private static final long serialVersionUID = -7548505095303313237L;

private static final Logger logger = LoggerFactory.getLogger(HystrixMetricsStreamServlet.class);

/* used to track number of connections and throttle */
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.stream.maxConcurrentConnections", 5);
private static DynamicIntProperty defaultMetricListenerQueueSize = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.stream.defaultMetricListenerQueueSize", 1000);
private static DynamicIntProperty maxConcurrentConnections =
DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);

private static volatile boolean isDestroyed = false;

/**
* WebSphere won't shutdown a servlet until after a 60 second timeout if there is an instance of the servlet executing
* a request. Add this method to enable a hook to notify Hystrix to shutdown. You must invoke this method at
* shutdown, perhaps from some other serverlet's destroy() method.
*/
public static void shutdown() {
isDestroyed = true;
public HystrixMetricsStreamServlet() {
super(HystrixDashboardStream.getInstance().observe());
}

@Override
public void init() throws ServletException {
isDestroyed = false;

/* package-private */ HystrixMetricsStreamServlet(Observable<HystrixDashboardStream.DashboardData> sampleStream, int pausePollerThreadDelayInMs) {
super(sampleStream, pausePollerThreadDelayInMs);
}

/**
* Handle incoming GETs
*/

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
if (isDestroyed) {
response.sendError(503, "Service has been shut down.");
} else {
handleRequest(request, response);
}
protected int getMaxNumberConcurrentConnectionsAllowed() {
return maxConcurrentConnections.get();
}

/**
* Handle servlet being undeployed by gracefully releasing connections so poller threads stop.
*/

@Override
public void destroy() {
/* set marker so the loops can break out */
isDestroyed = true;
super.destroy();
protected int getNumberCurrentConnections() {
return concurrentConnections.get();
}

/**
* - maintain an open connection with the client
* - on initial connection send latest data of each requested event type
* - subsequently send all changes for each requested event type
*
* @param request
* @param response
* @throws javax.servlet.ServletException
* @throws java.io.IOException
*/
private void handleRequest(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
/* ensure we aren't allowing more connections than we want */
int numberConnections = concurrentConnections.incrementAndGet();
HystrixMetricsPoller poller = null;
try {
if (numberConnections > maxConcurrentConnections.get()) {
response.sendError(503, "MaxConcurrentConnections reached: " + maxConcurrentConnections.get());
} else {

int delay = 500;
try {
String d = request.getParameter("delay");
if (d != null) {
delay = Math.max(Integer.parseInt(d), 1);
}
} catch (Exception e) {
// ignore if it's not a number
}

/* initialize response */
response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
response.setHeader("Pragma", "no-cache");

int queueSize = defaultMetricListenerQueueSize.get();

MetricJsonListener jsonListener = new MetricJsonListener(queueSize);
poller = new HystrixMetricsPoller(jsonListener, delay);
// start polling and it will write directly to the output stream
poller.start();
logger.debug("Starting poller");

// we will use a "single-writer" approach where the Servlet thread does all the writing
// by fetching JSON messages from the MetricJsonListener to write them to the output
try {
while (poller.isRunning() && !isDestroyed) {
List<String> jsonMessages = jsonListener.getJsonMetrics();
if (jsonMessages.isEmpty()) {
// https://github.com/Netflix/Hystrix/issues/85 hystrix.stream holds connection open if no metrics
// we send a ping to test the connection so that we'll get an IOException if the client has disconnected
response.getWriter().println("ping: \n");
} else {
for (String json : jsonMessages) {
response.getWriter().println("data: " + json + "\n");
}
}

/* shortcut breaking out of loop if we have been destroyed */
if(isDestroyed) {
break;
}

// after outputting all the messages we will flush the stream
response.flushBuffer();

// explicitly check for client disconnect - PrintWriter does not throw exceptions
if (response.getWriter().checkError()) {
throw new IOException("io error");
}

// now wait the 'delay' time
Thread.sleep(delay);
}
} catch (InterruptedException e) {
poller.shutdown();
logger.debug("InterruptedException. Will stop polling.");
Thread.currentThread().interrupt();
} catch (IOException e) {
poller.shutdown();
// debug instead of error as we expect to get these whenever a client disconnects or network issue occurs
logger.debug("IOException while trying to write (generally caused by client disconnecting). Will stop polling.", e);
} catch (Exception e) {
poller.shutdown();
logger.error("Failed to write Hystrix metrics. Will stop polling.", e);
}
logger.debug("Stopping Turbine stream to connection");
}
} catch (Exception e) {
logger.error("Error initializing servlet for metrics event stream.", e);
} finally {
concurrentConnections.decrementAndGet();
if (poller != null) {
poller.shutdown();
}
}
@Override
protected int incrementAndGetCurrentConcurrentConnections() {
return concurrentConnections.incrementAndGet();
}

/**
* This will be called from another thread so needs to be thread-safe.
* @ThreadSafe
*/
private static class MetricJsonListener implements HystrixMetricsPoller.MetricsAsJsonPollerListener {

/**
* Setting limit to 1000. In a healthy system there isn't any reason to hit this limit so if we do it will throw an exception which causes the poller to stop.
* <p>
* This is a safety check against a runaway poller causing memory leaks.
*/
private LinkedBlockingQueue<String> jsonMetrics;

public MetricJsonListener(int queueSize) {
jsonMetrics = new LinkedBlockingQueue<String>(queueSize);
}

/**
* Store JSON messages in a queue.
*/
@Override
public void handleJsonMetric(String json) {
jsonMetrics.add(json);
}
@Override
protected void decrementCurrentConcurrentConnections() {
concurrentConnections.decrementAndGet();
}

/**
* Get all JSON messages in the queue.
*
* @return
*/
public List<String> getJsonMetrics() {
ArrayList<String> metrics = new ArrayList<String>();
jsonMetrics.drainTo(metrics);
return metrics;
}
@Override
protected String convertToString(HystrixDashboardStream.DashboardData dashboardData) throws IOException {
return SerialHystrixDashboardData.toJsonString(dashboardData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ public HystrixConfigSseServlet() {
}

@Override
int getMaxNumberConcurrentConnectionsAllowed() {
protected int getMaxNumberConcurrentConnectionsAllowed() {
return maxConcurrentConnections.get();
}

@Override
int getNumberCurrentConnections() {
protected int getNumberCurrentConnections() {
return concurrentConnections.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ protected HystrixSampleSseServlet(Observable<SampleData> sampleStream, int pause
this.pausePollerThreadDelayInMs = pausePollerThreadDelayInMs;
}

abstract int getMaxNumberConcurrentConnectionsAllowed();
protected abstract int getMaxNumberConcurrentConnectionsAllowed();

abstract int getNumberCurrentConnections();
protected abstract int getNumberCurrentConnections();

protected abstract int incrementAndGetCurrentConcurrentConnections();

Expand Down Expand Up @@ -131,8 +131,6 @@ private void handleRequest(HttpServletRequest request, final HttpServletResponse

final PrintWriter writer = response.getWriter();

//Observable<SampleData> sampledStream = getStream();

//since the sample stream is based on Observable.interval, events will get published on an RxComputation thread
//since writing to the servlet response is blocking, use the Rx IO thread for the write that occurs in the onNext
sampleSubscription = sampleStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ public HystrixUtilizationSseServlet() {
}

@Override
int getMaxNumberConcurrentConnectionsAllowed() {
protected int getMaxNumberConcurrentConnectionsAllowed() {
return maxConcurrentConnections.get();
}

@Override
int getNumberCurrentConnections() {
protected int getNumberCurrentConnections() {
return concurrentConnections.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,70 @@
*/
package com.netflix.hystrix.contrib.metrics.eventstream;

import com.netflix.hystrix.metric.consumer.HystrixDashboardStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import rx.Observable;
import rx.functions.Func1;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.TimeUnit;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class HystrixMetricsStreamServletUnitTest {

@Mock HttpServletRequest mockReq;
@Mock HttpServletResponse mockResp;
@Mock HystrixDashboardStream.DashboardData mockDashboard;
@Mock PrintWriter mockPrintWriter;

HystrixMetricsStreamServlet servlet;

private final Observable<HystrixDashboardStream.DashboardData> streamOfOnNexts =
Observable.interval(100, TimeUnit.MILLISECONDS).map(new Func1<Long, HystrixDashboardStream.DashboardData>() {
@Override
public HystrixDashboardStream.DashboardData call(Long timestamp) {
return mockDashboard;
}
});


@Before
public void init() {
MockitoAnnotations.initMocks(this);
when(mockReq.getMethod()).thenReturn("GET");
}

@After
public void tearDown() {
servlet.destroy();
servlet.shutdown();
}

@Test
public void shutdownServletShouldRejectRequests() throws ServletException, IOException {
servlet = new HystrixMetricsStreamServlet(streamOfOnNexts, 10);
try {
servlet.init();
} catch (ServletException ex) {

final HystrixMetricsStreamServlet servlet = new HystrixMetricsStreamServlet();
servlet.shutdown();
}

final HttpServletResponse response = mock(HttpServletResponse.class);
servlet.doGet(mock(HttpServletRequest.class), response);
servlet.shutdown();

verify(response).sendError(503, "Service has been shut down.");
servlet.service(mockReq, mockResp);

verify(mockResp).sendError(503, "Service has been shut down.");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public void call(Subscriber<? super HystrixConfiguration> subscriber) {
@Before
public void init() {
MockitoAnnotations.initMocks(this);

}

@After
Expand All @@ -111,7 +110,6 @@ public void shutdownServletShouldRejectRequests() throws ServletException, IOExc
servlet.doGet(mockReq, mockResp);

verify(mockResp).sendError(503, "Service has been shut down.");

}

@Test
Expand Down
Loading

0 comments on commit 151a4a0

Please sign in to comment.