diff --git a/hystrix-contrib/hystrix-rx-netty-metrics-stream/build.gradle b/hystrix-contrib/hystrix-rx-netty-metrics-stream/build.gradle
new file mode 100644
index 000000000..70f236e2b
--- /dev/null
+++ b/hystrix-contrib/hystrix-rx-netty-metrics-stream/build.gradle
@@ -0,0 +1,29 @@
+apply plugin: 'java'
+apply plugin: 'eclipse'
+apply plugin: 'idea'
+
+dependencies {
+ compile project(':hystrix-core')
+ compile "com.netflix.rxnetty:rx-netty:0.3.8"
+ compile 'org.codehaus.jackson:jackson-core-asl:1.9.2'
+ compile 'org.codehaus.jackson:jackson-mapper-asl:1.9.2'
+ testCompile 'junit:junit-dep:4.10'
+ testCompile 'org.powermock:powermock-easymock-release-full:1.5.5'
+ testCompile 'org.easymock:easymock:3.2'
+}
+
+eclipse {
+ classpath {
+ // include 'provided' dependencies on the classpath
+ plusConfigurations += configurations.provided
+ downloadSources = true
+ downloadJavadoc = true
+ }
+}
+
+idea {
+ module {
+ // include 'provided' dependencies on the classpath
+ scopes.COMPILE.plus += configurations.provided
+ }
+}
\ No newline at end of file
diff --git a/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandler.java b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandler.java
new file mode 100644
index 000000000..d4ee69c7a
--- /dev/null
+++ b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandler.java
@@ -0,0 +1,111 @@
+package com.netflix.hystrix.contrib.rxnetty.metricsstream;
+
+import com.netflix.hystrix.HystrixCommandMetrics;
+import com.netflix.hystrix.HystrixThreadPoolMetrics;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.reactivex.netty.protocol.http.server.HttpServerRequest;
+import io.reactivex.netty.protocol.http.server.HttpServerResponse;
+import io.reactivex.netty.protocol.http.server.RequestHandler;
+import rx.Observable;
+import rx.Subscription;
+import rx.functions.Action1;
+import rx.schedulers.Schedulers;
+import rx.subjects.PublishSubject;
+import rx.subjects.Subject;
+import rx.subscriptions.MultipleAssignmentSubscription;
+
+import java.nio.charset.Charset;
+import java.util.concurrent.TimeUnit;
+
+import static com.netflix.hystrix.contrib.rxnetty.metricsstream.JsonMappers.*;
+
+/**
+ * Streams Hystrix metrics in Server Sent Event (SSE) format. RxNetty application handlers shall
+ * be wrapped by this handler. It transparently intercepts HTTP requests at a configurable path
+ * (default "/hystrix.stream"), and sends unbounded SSE streams back to the client. All other requests
+ * are transparently forwarded to the application handlers.
+ *
+ * For RxNetty client tapping into SSE stream: remember to use unpooled HTTP connections. If not, the pooled HTTP
+ * connection will not be closed on unsubscribe event and the event stream will continue to flow towards the client
+ * (unless the client is shutdown).
+ *
+ * @author Tomasz Bak
+ */
+public class HystrixMetricsStreamHandler implements RequestHandler {
+
+ public static final String DEFAULT_HYSTRIX_PREFIX = "/hystrix.stream";
+
+ public static final int DEFAULT_INTERVAL = 2000;
+
+ private static final byte[] HEADER = "data: ".getBytes(Charset.defaultCharset());
+ private static final byte[] FOOTER = {10, 10};
+ private static final int EXTRA_SPACE = HEADER.length + FOOTER.length;
+
+ private final String hystrixPrefix;
+ private final long interval;
+ private final RequestHandler appHandler;
+
+ public HystrixMetricsStreamHandler(RequestHandler appHandler) {
+ this(DEFAULT_HYSTRIX_PREFIX, DEFAULT_INTERVAL, appHandler);
+ }
+
+ HystrixMetricsStreamHandler(String hystrixPrefix, long interval, RequestHandler appHandler) {
+ this.hystrixPrefix = hystrixPrefix;
+ this.interval = interval;
+ this.appHandler = appHandler;
+ }
+
+ @Override
+ public Observable handle(HttpServerRequest request, HttpServerResponse response) {
+ if (request.getPath().startsWith(hystrixPrefix)) {
+ return handleHystrixRequest(response);
+ }
+ return appHandler.handle(request, response);
+ }
+
+ private Observable handleHystrixRequest(final HttpServerResponse response) {
+ writeHeaders(response);
+
+ final Subject subject = PublishSubject.create();
+ final MultipleAssignmentSubscription subscription = new MultipleAssignmentSubscription();
+ Subscription actionSubscription = Observable.timer(0, interval, TimeUnit.MILLISECONDS, Schedulers.computation())
+ .subscribe(new Action1() {
+ @Override
+ public void call(Long tick) {
+ if (!response.getChannelHandlerContext().channel().isOpen()) {
+ subscription.unsubscribe();
+ return;
+ }
+ try {
+ for (HystrixCommandMetrics commandMetrics : HystrixCommandMetrics.getInstances()) {
+ writeMetric(toJson(commandMetrics), response);
+ }
+ for (HystrixThreadPoolMetrics threadPoolMetrics : HystrixThreadPoolMetrics.getInstances()) {
+ writeMetric(toJson(threadPoolMetrics), response);
+ }
+ } catch (Exception e) {
+ subject.onError(e);
+ }
+ }
+ });
+ subscription.set(actionSubscription);
+ return subject;
+ }
+
+ private void writeHeaders(HttpServerResponse response) {
+ response.getHeaders().add("Content-Type", "text/event-stream;charset=UTF-8");
+ response.getHeaders().add("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
+ response.getHeaders().add("Pragma", "no-cache");
+ }
+
+ @SuppressWarnings("unchecked")
+ private void writeMetric(String json, HttpServerResponse response) {
+ byte[] bytes = json.getBytes(Charset.defaultCharset());
+ ByteBuf byteBuf = UnpooledByteBufAllocator.DEFAULT.buffer(bytes.length + EXTRA_SPACE);
+ byteBuf.writeBytes(HEADER);
+ byteBuf.writeBytes(bytes);
+ byteBuf.writeBytes(FOOTER);
+ response.writeAndFlush((O) byteBuf);
+ }
+}
diff --git a/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/JsonMappers.java b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/JsonMappers.java
new file mode 100644
index 000000000..0799c75fc
--- /dev/null
+++ b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/main/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/JsonMappers.java
@@ -0,0 +1,169 @@
+package com.netflix.hystrix.contrib.rxnetty.metricsstream;
+
+import com.netflix.hystrix.HystrixCircuitBreaker;
+import com.netflix.hystrix.HystrixCommandKey;
+import com.netflix.hystrix.HystrixCommandMetrics;
+import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts;
+import com.netflix.hystrix.HystrixCommandProperties;
+import com.netflix.hystrix.HystrixThreadPoolKey;
+import com.netflix.hystrix.HystrixThreadPoolMetrics;
+import com.netflix.hystrix.util.HystrixRollingNumberEvent;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * This code is taken from hystrix-metrics-event-stream module's HystrixMetricsPoller class.
+ *
+ * @author Tomasz Bak
+ */
+final class JsonMappers {
+
+ private static final JsonFactory jsonFactory = new JsonFactory();
+
+ private JsonMappers() {
+ }
+
+ static String toJson(HystrixCommandMetrics commandMetrics) throws IOException {
+ HystrixCommandKey key = commandMetrics.getCommandKey();
+ HystrixCircuitBreaker circuitBreaker = HystrixCircuitBreaker.Factory.getInstance(key);
+
+ StringWriter jsonString = new StringWriter();
+ JsonGenerator json = jsonFactory.createJsonGenerator(jsonString);
+
+ json.writeStartObject();
+ json.writeStringField("type", "HystrixCommand");
+ json.writeStringField("name", key.name());
+ json.writeStringField("group", commandMetrics.getCommandGroup().name());
+ json.writeNumberField("currentTime", System.currentTimeMillis());
+
+ // circuit breaker
+ if (circuitBreaker == null) {
+ // circuit breaker is disabled and thus never open
+ json.writeBooleanField("isCircuitBreakerOpen", false);
+ } else {
+ json.writeBooleanField("isCircuitBreakerOpen", circuitBreaker.isOpen());
+ }
+ HealthCounts healthCounts = commandMetrics.getHealthCounts();
+ json.writeNumberField("errorPercentage", healthCounts.getErrorPercentage());
+ json.writeNumberField("errorCount", healthCounts.getErrorCount());
+ json.writeNumberField("requestCount", healthCounts.getTotalRequests());
+
+ // rolling counters
+ json.writeNumberField("rollingCountCollapsedRequests", commandMetrics.getRollingCount(HystrixRollingNumberEvent.COLLAPSED));
+ json.writeNumberField("rollingCountExceptionsThrown", commandMetrics.getRollingCount(HystrixRollingNumberEvent.EXCEPTION_THROWN));
+ json.writeNumberField("rollingCountFailure", commandMetrics.getRollingCount(HystrixRollingNumberEvent.FAILURE));
+ json.writeNumberField("rollingCountFallbackFailure", commandMetrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_FAILURE));
+ json.writeNumberField("rollingCountFallbackRejection", commandMetrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_REJECTION));
+ json.writeNumberField("rollingCountFallbackSuccess", commandMetrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_SUCCESS));
+ json.writeNumberField("rollingCountResponsesFromCache", commandMetrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE));
+ json.writeNumberField("rollingCountSemaphoreRejected", commandMetrics.getRollingCount(HystrixRollingNumberEvent.SEMAPHORE_REJECTED));
+ json.writeNumberField("rollingCountShortCircuited", commandMetrics.getRollingCount(HystrixRollingNumberEvent.SHORT_CIRCUITED));
+ json.writeNumberField("rollingCountSuccess", commandMetrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS));
+ json.writeNumberField("rollingCountThreadPoolRejected", commandMetrics.getRollingCount(HystrixRollingNumberEvent.THREAD_POOL_REJECTED));
+ json.writeNumberField("rollingCountTimeout", commandMetrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT));
+
+ json.writeNumberField("currentConcurrentExecutionCount", commandMetrics.getCurrentConcurrentExecutionCount());
+
+ // latency percentiles
+ json.writeNumberField("latencyExecute_mean", commandMetrics.getExecutionTimeMean());
+ json.writeObjectFieldStart("latencyExecute");
+ json.writeNumberField("0", commandMetrics.getExecutionTimePercentile(0));
+ json.writeNumberField("25", commandMetrics.getExecutionTimePercentile(25));
+ json.writeNumberField("50", commandMetrics.getExecutionTimePercentile(50));
+ json.writeNumberField("75", commandMetrics.getExecutionTimePercentile(75));
+ json.writeNumberField("90", commandMetrics.getExecutionTimePercentile(90));
+ json.writeNumberField("95", commandMetrics.getExecutionTimePercentile(95));
+ json.writeNumberField("99", commandMetrics.getExecutionTimePercentile(99));
+ json.writeNumberField("99.5", commandMetrics.getExecutionTimePercentile(99.5));
+ json.writeNumberField("100", commandMetrics.getExecutionTimePercentile(100));
+ json.writeEndObject();
+ //
+ json.writeNumberField("latencyTotal_mean", commandMetrics.getTotalTimeMean());
+ json.writeObjectFieldStart("latencyTotal");
+ json.writeNumberField("0", commandMetrics.getTotalTimePercentile(0));
+ json.writeNumberField("25", commandMetrics.getTotalTimePercentile(25));
+ json.writeNumberField("50", commandMetrics.getTotalTimePercentile(50));
+ json.writeNumberField("75", commandMetrics.getTotalTimePercentile(75));
+ json.writeNumberField("90", commandMetrics.getTotalTimePercentile(90));
+ json.writeNumberField("95", commandMetrics.getTotalTimePercentile(95));
+ json.writeNumberField("99", commandMetrics.getTotalTimePercentile(99));
+ json.writeNumberField("99.5", commandMetrics.getTotalTimePercentile(99.5));
+ json.writeNumberField("100", commandMetrics.getTotalTimePercentile(100));
+ json.writeEndObject();
+
+ // property values for reporting what is actually seen by the command rather than what was set somewhere
+ HystrixCommandProperties commandProperties = commandMetrics.getProperties();
+
+ json.writeNumberField("propertyValue_circuitBreakerRequestVolumeThreshold", commandProperties.circuitBreakerRequestVolumeThreshold().get());
+ json.writeNumberField("propertyValue_circuitBreakerSleepWindowInMilliseconds", commandProperties.circuitBreakerSleepWindowInMilliseconds().get());
+ json.writeNumberField("propertyValue_circuitBreakerErrorThresholdPercentage", commandProperties.circuitBreakerErrorThresholdPercentage().get());
+ json.writeBooleanField("propertyValue_circuitBreakerForceOpen", commandProperties.circuitBreakerForceOpen().get());
+ json.writeBooleanField("propertyValue_circuitBreakerForceClosed", commandProperties.circuitBreakerForceClosed().get());
+ json.writeBooleanField("propertyValue_circuitBreakerEnabled", commandProperties.circuitBreakerEnabled().get());
+
+ json.writeStringField("propertyValue_executionIsolationStrategy", commandProperties.executionIsolationStrategy().get().name());
+ json.writeNumberField("propertyValue_executionIsolationThreadTimeoutInMilliseconds", commandProperties.executionIsolationThreadTimeoutInMilliseconds().get());
+ json.writeBooleanField("propertyValue_executionIsolationThreadInterruptOnTimeout", commandProperties.executionIsolationThreadInterruptOnTimeout().get());
+ json.writeStringField("propertyValue_executionIsolationThreadPoolKeyOverride", commandProperties.executionIsolationThreadPoolKeyOverride().get());
+ json.writeNumberField("propertyValue_executionIsolationSemaphoreMaxConcurrentRequests", commandProperties.executionIsolationSemaphoreMaxConcurrentRequests().get());
+ json.writeNumberField("propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests", commandProperties.fallbackIsolationSemaphoreMaxConcurrentRequests().get());
+
+ /*
+ * The following are commented out as these rarely change and are verbose for streaming for something people don't change.
+ * We could perhaps allow a property or request argument to include these.
+ */
+
+ // json.put("propertyValue_metricsRollingPercentileEnabled", commandProperties.metricsRollingPercentileEnabled().get());
+ // json.put("propertyValue_metricsRollingPercentileBucketSize", commandProperties.metricsRollingPercentileBucketSize().get());
+ // json.put("propertyValue_metricsRollingPercentileWindow", commandProperties.metricsRollingPercentileWindowInMilliseconds().get());
+ // json.put("propertyValue_metricsRollingPercentileWindowBuckets", commandProperties.metricsRollingPercentileWindowBuckets().get());
+ // json.put("propertyValue_metricsRollingStatisticalWindowBuckets", commandProperties.metricsRollingStatisticalWindowBuckets().get());
+ json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", commandProperties.metricsRollingStatisticalWindowInMilliseconds().get());
+
+ json.writeBooleanField("propertyValue_requestCacheEnabled", commandProperties.requestCacheEnabled().get());
+ json.writeBooleanField("propertyValue_requestLogEnabled", commandProperties.requestLogEnabled().get());
+
+ json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster
+
+ json.writeEndObject();
+ json.close();
+
+ return jsonString.getBuffer().toString();
+ }
+
+ static String toJson(HystrixThreadPoolMetrics threadPoolMetrics) throws IOException {
+ HystrixThreadPoolKey key = threadPoolMetrics.getThreadPoolKey();
+
+ StringWriter jsonString = new StringWriter();
+ JsonGenerator json = jsonFactory.createJsonGenerator(jsonString);
+ json.writeStartObject();
+
+ json.writeStringField("type", "HystrixThreadPool");
+ json.writeStringField("name", key.name());
+ json.writeNumberField("currentTime", System.currentTimeMillis());
+
+ json.writeNumberField("currentActiveCount", threadPoolMetrics.getCurrentActiveCount().intValue());
+ json.writeNumberField("currentCompletedTaskCount", threadPoolMetrics.getCurrentCompletedTaskCount().longValue());
+ json.writeNumberField("currentCorePoolSize", threadPoolMetrics.getCurrentCorePoolSize().intValue());
+ json.writeNumberField("currentLargestPoolSize", threadPoolMetrics.getCurrentLargestPoolSize().intValue());
+ json.writeNumberField("currentMaximumPoolSize", threadPoolMetrics.getCurrentMaximumPoolSize().intValue());
+ json.writeNumberField("currentPoolSize", threadPoolMetrics.getCurrentPoolSize().intValue());
+ json.writeNumberField("currentQueueSize", threadPoolMetrics.getCurrentQueueSize().intValue());
+ json.writeNumberField("currentTaskCount", threadPoolMetrics.getCurrentTaskCount().longValue());
+ json.writeNumberField("rollingCountThreadsExecuted", threadPoolMetrics.getRollingCountThreadsExecuted());
+ json.writeNumberField("rollingMaxActiveThreads", threadPoolMetrics.getRollingMaxActiveThreads());
+
+ json.writeNumberField("propertyValue_queueSizeRejectionThreshold", threadPoolMetrics.getProperties().queueSizeRejectionThreshold().get());
+ json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", threadPoolMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get());
+
+ json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster
+
+ json.writeEndObject();
+ json.close();
+
+ return jsonString.getBuffer().toString();
+ }
+}
\ No newline at end of file
diff --git a/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/test/java/com/netflix/hystrix/HystrixCommandMetricsSamples.java b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/test/java/com/netflix/hystrix/HystrixCommandMetricsSamples.java
new file mode 100644
index 000000000..3493204b2
--- /dev/null
+++ b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/test/java/com/netflix/hystrix/HystrixCommandMetricsSamples.java
@@ -0,0 +1,41 @@
+package com.netflix.hystrix;
+
+import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifierDefault;
+
+/**
+ * Not very elegant, but there is no other way to create this data directly for testing
+ * purposes, as {@link com.netflix.hystrix.HystrixCommandMetrics} has no public constructors,
+ * only package private.
+ *
+ * @author Tomasz Bak
+ */
+public class HystrixCommandMetricsSamples {
+
+ public static final HystrixCommandMetrics SAMPLE_1;
+
+ private static class MyHystrixCommandKey implements HystrixCommandKey {
+ @Override
+ public String name() {
+ return "hystrixKey";
+ }
+ }
+
+ private static class MyHystrixCommandGroupKey implements HystrixCommandGroupKey {
+ @Override
+ public String name() {
+ return "hystrixCommandGroupKey";
+ }
+ }
+
+ private static class MyHystrixCommandProperties extends HystrixCommandProperties {
+ protected MyHystrixCommandProperties(HystrixCommandKey key) {
+ super(key);
+ }
+ }
+
+ static {
+ HystrixCommandKey key = new MyHystrixCommandKey();
+ SAMPLE_1 = new HystrixCommandMetrics(key, new MyHystrixCommandGroupKey(),
+ new MyHystrixCommandProperties(key), HystrixEventNotifierDefault.getInstance());
+ }
+}
diff --git a/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/test/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandlerTest.java b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/test/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandlerTest.java
new file mode 100644
index 000000000..d0941bdab
--- /dev/null
+++ b/hystrix-contrib/hystrix-rx-netty-metrics-stream/src/test/java/com/netflix/hystrix/contrib/rxnetty/metricsstream/HystrixMetricsStreamHandlerTest.java
@@ -0,0 +1,119 @@
+package com.netflix.hystrix.contrib.rxnetty.metricsstream;
+
+import com.netflix.hystrix.HystrixCommandMetrics;
+import com.netflix.hystrix.HystrixCommandMetricsSamples;
+import io.netty.buffer.ByteBuf;
+import io.reactivex.netty.RxNetty;
+import io.reactivex.netty.pipeline.PipelineConfigurators;
+import io.reactivex.netty.protocol.http.client.HttpClient;
+import io.reactivex.netty.protocol.http.client.HttpClientRequest;
+import io.reactivex.netty.protocol.http.client.HttpClientResponse;
+import io.reactivex.netty.protocol.http.server.HttpServer;
+import io.reactivex.netty.protocol.http.server.HttpServerRequest;
+import io.reactivex.netty.protocol.http.server.HttpServerResponse;
+import io.reactivex.netty.protocol.http.server.RequestHandler;
+import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import rx.Observable;
+import rx.functions.Func1;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static com.netflix.hystrix.contrib.rxnetty.metricsstream.HystrixMetricsStreamHandler.*;
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
+import static org.powermock.api.easymock.PowerMock.*;
+
+/**
+ * @author Tomasz Bak
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(HystrixCommandMetrics.class)
+public class HystrixMetricsStreamHandlerTest {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ private static Collection SAMPLE_HYSTRIX_COMMAND_METRICS =
+ Collections.singleton(HystrixCommandMetricsSamples.SAMPLE_1);
+
+ private int port;
+ private HttpServer server;
+ private HttpClient client;
+
+ @Before
+ public void setUp() throws Exception {
+ server = createServer();
+
+ client = RxNetty.newHttpClientBuilder("localhost", port)
+ .withNoConnectionPooling()
+ .pipelineConfigurator(PipelineConfigurators.sseClientConfigurator())
+ .build();
+
+ mockStatic(HystrixCommandMetrics.class);
+ expect(HystrixCommandMetrics.getInstances()).andReturn(SAMPLE_HYSTRIX_COMMAND_METRICS).anyTimes();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (server != null) {
+ server.shutdown();
+ }
+ if (client != null) {
+ client.shutdown();
+ }
+ }
+
+ @Test
+ public void testMetricsAreDeliveredAsSseStream() throws Exception {
+ replayAll();
+
+ Observable objectObservable = client.submit(HttpClientRequest.createGet(DEFAULT_HYSTRIX_PREFIX))
+ .flatMap(new Func1, Observable extends ServerSentEvent>>() {
+ @Override
+ public Observable extends ServerSentEvent> call(HttpClientResponse httpClientResponse) {
+ return httpClientResponse.getContent().take(1);
+ }
+ });
+
+ Object first = Observable.amb(objectObservable, Observable.timer(1000, TimeUnit.MILLISECONDS)).toBlockingObservable().first();
+
+ assertTrue("Expected SSE message", first instanceof ServerSentEvent);
+ ServerSentEvent sse = (ServerSentEvent) first;
+ JsonNode jsonNode = mapper.readTree(sse.getEventData());
+ assertEquals("Expected hystrix key name", HystrixCommandMetricsSamples.SAMPLE_1.getCommandKey().name(), jsonNode.get("name").asText());
+ }
+
+ // We try a few times in case we hit into used port.
+ private HttpServer createServer() {
+ Random random = new Random();
+ Exception error = null;
+ for (int i = 0; i < 3 && server == null; i++) {
+ port = 10000 + random.nextInt(50000);
+ try {
+ return RxNetty.newHttpServerBuilder(port, new HystrixMetricsStreamHandler(
+ DEFAULT_HYSTRIX_PREFIX,
+ DEFAULT_INTERVAL,
+ new RequestHandler() { // Application handler
+ @Override
+ public Observable handle(HttpServerRequest request, HttpServerResponse response) {
+ return Observable.empty();
+ }
+ }
+ )).build().start();
+ } catch (Exception e) {
+ error = e;
+ }
+ }
+ throw new RuntimeException("Cannot initialize RxNetty server", error);
+ }
+}
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 4a0feef28..00604178d 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -6,6 +6,7 @@ include 'hystrix-core', \
'hystrix-contrib:hystrix-request-servlet', \
'hystrix-contrib:hystrix-servo-metrics-publisher', \
'hystrix-contrib:hystrix-metrics-event-stream', \
+'hystrix-contrib:hystrix-rx-netty-metrics-stream', \
'hystrix-contrib:hystrix-codahale-metrics-publisher', \
'hystrix-contrib:hystrix-yammer-metrics-publisher', \
'hystrix-contrib:hystrix-network-auditor-agent', \