Skip to content

Commit

Permalink
Adding hystrix-metrics-event-stream-jaxrs module
Browse files Browse the repository at this point in the history
  • Loading branch information
Justin Jose committed Mar 14, 2017
1 parent 8f16eb9 commit e9d8276
Show file tree
Hide file tree
Showing 16 changed files with 1,106 additions and 0 deletions.
62 changes: 62 additions & 0 deletions hystrix-contrib/hystrix-metrics-event-stream-jaxrs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# hystrix-metrics-event-stream-jaxrs

This module is a JAX-RS implementation of [hystrix-metrics-event-stream](https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-metrics-event-stream) module without any Servlet API dependency and exposes metrics in a [text/event-stream](https://developer.mozilla.org/en-US/docs/Server-sent_events/Using_server-sent_events) formatted stream that continues as long as a client holds the connection.


# Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22hystrix-metrics-event-stream-jaxrs%22).

Example for Maven ([lookup latest version](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22hystrix-metrics-event-stream-jaxrs%22)):

```xml
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-metrics-event-stream-jaxrs</artifactId>
<version>1.6.0</version>
</dependency>
```
and for Ivy:

```xml
<dependency org="com.netflix.hystrix" name="hystrix-metrics-event-stream-jaxrs" rev="1.6.0" />
```

# Installation

1) Include hystrix-metrics-event-stream-jaxrs*.jar in your classpath (such as /WEB-INF/lib).
2) Register `HystrixStreamFeature` in your `javax.ws.rs.core.Application` as shown below.

```java

public class HystrixStreamApplication extends Application{

@Override
public Set<Class<?>> getClasses() {
Set<Class<?>> clazzes = new HashSet<Class<?>>();
clazzes.add(HystrixStreamFeature.class);
return clazzes;
}
}
```

3) Following end-points are available
* /hystrix.stream - Stream Hystrix Metrics
* /hystrix/utilization.stream - Stream Hystrix Utilization
* /hystrix/config.stream - Stream Hystrix configuration
* /hystrix/request.stream - Stream Hystrix SSE events



# Test

To test your installation you can use curl like this:

```
$ curl http://hostname:port/appname/hystrix.stream
data: {"rollingCountFailure":0,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"rollingCountTimeout":0,"rollingCountExceptionsThrown":0,"rollingCountFallbackSuccess":0,"errorCount":0,"type":"HystrixCommand","propertyValue_circuitBreakerEnabled":true,"reportingHosts":1,"latencyTotal":{"0":0,"95":0,"99.5":0,"90":0,"25":0,"99":0,"75":0,"100":0,"50":0},"currentConcurrentExecutionCount":0,"rollingCountSemaphoreRejected":0,"rollingCountFallbackRejection":0,"rollingCountShortCircuited":0,"rollingCountResponsesFromCache":0,"propertyValue_circuitBreakerForceClosed":false,"name":"IdentityCookieAuthSwitchProfile","propertyValue_executionIsolationThreadPoolKeyOverride":"null","rollingCountSuccess":0,"propertyValue_requestLogEnabled":true,"requestCount":0,"rollingCountCollapsedRequests":0,"errorPercentage":0,"propertyValue_circuitBreakerSleepWindowInMilliseconds":5000,"latencyTotal_mean":0,"propertyValue_circuitBreakerForceOpen":false,"propertyValue_circuitBreakerRequestVolumeThreshold":20,"propertyValue_circuitBreakerErrorThresholdPercentage":50,"propertyValue_executionIsolationStrategy":"THREAD","rollingCountFallbackFailure":0,"isCircuitBreakerOpen":false,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":20,"propertyValue_executionIsolationThreadTimeoutInMilliseconds":1000,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":10000,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":10,"latencyExecute":{"0":0,"95":0,"99.5":0,"90":0,"25":0,"99":0,"75":0,"100":0,"50":0},"group":"IDENTITY","latencyExecute_mean":0,"propertyValue_requestCacheEnabled":true,"rollingCountThreadPoolRejected":0}
data: {"rollingCountFailure":0,"propertyValue_executionIsolationThreadInterruptOnTimeout":true,"rollingCountTimeout":0,"rollingCountExceptionsThrown":0,"rollingCountFallbackSuccess":0,"errorCount":0,"type":"HystrixCommand","propertyValue_circuitBreakerEnabled":true,"reportingHosts":3,"latencyTotal":{"0":1,"95":1,"99.5":1,"90":1,"25":1,"99":1,"75":1,"100":1,"50":1},"currentConcurrentExecutionCount":0,"rollingCountSemaphoreRejected":0,"rollingCountFallbackRejection":0,"rollingCountShortCircuited":0,"rollingCountResponsesFromCache":0,"propertyValue_circuitBreakerForceClosed":false,"name":"CryptexDecrypt","propertyValue_executionIsolationThreadPoolKeyOverride":"null","rollingCountSuccess":1,"propertyValue_requestLogEnabled":true,"requestCount":1,"rollingCountCollapsedRequests":0,"errorPercentage":0,"propertyValue_circuitBreakerSleepWindowInMilliseconds":15000,"latencyTotal_mean":1,"propertyValue_circuitBreakerForceOpen":false,"propertyValue_circuitBreakerRequestVolumeThreshold":60,"propertyValue_circuitBreakerErrorThresholdPercentage":150,"propertyValue_executionIsolationStrategy":"THREAD","rollingCountFallbackFailure":0,"isCircuitBreakerOpen":false,"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests":60,"propertyValue_executionIsolationThreadTimeoutInMilliseconds":3000,"propertyValue_metricsRollingStatisticalWindowInMilliseconds":30000,"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests":30,"latencyExecute":{"0":0,"95":0,"99.5":0,"90":0,"25":0,"99":0,"75":0,"100":0,"50":0},"group":"CRYPTEX","latencyExecute_mean":0,"propertyValue_requestCacheEnabled":true,"rollingCountThreadPoolRejected":0}
```

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
dependencies {
compileApi project(':hystrix-core')
compile project(':hystrix-serialization')
provided 'javax.ws.rs:javax.ws.rs-api:2.0.1'
testCompile 'junit:junit-dep:4.10'
testCompile 'org.glassfish.jersey.test-framework.providers:jersey-test-framework-provider-grizzly2:2.25.1'
testCompile 'org.glassfish.jersey.media:jersey-media-sse:2.25.1'

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix.contrib.metrics;

import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;

/**
* @author justinjose28
*
*/
public final class HystrixStream {
private final Observable<String> sampleStream;
private final int pausePollerThreadDelayInMs;
private final AtomicInteger concurrentConnections;

public HystrixStream(Observable<String> sampleStream, int pausePollerThreadDelayInMs, AtomicInteger concurrentConnections) {
this.sampleStream = sampleStream;
this.pausePollerThreadDelayInMs = pausePollerThreadDelayInMs;
this.concurrentConnections = concurrentConnections;
}

public Observable<String> getSampleStream() {
return sampleStream;
}

public int getPausePollerThreadDelayInMs() {
return pausePollerThreadDelayInMs;
}

public AtomicInteger getConcurrentConnections() {
return concurrentConnections;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix.contrib.metrics;

import javax.ws.rs.core.Feature;
import javax.ws.rs.core.FeatureContext;

import com.netflix.hystrix.contrib.metrics.controller.HystrixConfigSseController;
import com.netflix.hystrix.contrib.metrics.controller.HystrixMetricsStreamController;
import com.netflix.hystrix.contrib.metrics.controller.HystrixRequestEventsSseController;
import com.netflix.hystrix.contrib.metrics.controller.HystrixUtilizationSseController;

/**
* @author justinjose28
*
*/
public class HystrixStreamFeature implements Feature {

@Override
public boolean configure(FeatureContext context) {
context.register(new HystrixMetricsStreamController());
context.register(new HystrixUtilizationSseController());
context.register(new HystrixRequestEventsSseController());
context.register(new HystrixConfigSseController());
context.register(HystrixStreamingOutputProvider.class);
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix.contrib.metrics;

import java.io.IOException;
import java.io.OutputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.ext.MessageBodyWriter;
import javax.ws.rs.ext.Provider;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Subscriber;
import rx.Subscription;
import rx.schedulers.Schedulers;

/**
* {@link MessageBodyWriter} implementation which handles serialization of HystrixStream
*
*
* @author justinjose28
*
*/

@Provider
public class HystrixStreamingOutputProvider implements MessageBodyWriter<HystrixStream> {

private static final Logger LOGGER = LoggerFactory.getLogger(HystrixStreamingOutputProvider.class);

@Override
public boolean isWriteable(Class<?> t, Type gt, Annotation[] as, MediaType mediaType) {
return HystrixStream.class.isAssignableFrom(t);
}

@Override
public long getSize(HystrixStream o, Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType) {
return -1;
}

@Override
public void writeTo(HystrixStream o, Class<?> t, Type gt, Annotation[] as, MediaType mediaType, MultivaluedMap<String, Object> httpHeaders, final OutputStream entity) throws IOException {
Subscription sampleSubscription = null;
final AtomicBoolean moreDataWillBeSent = new AtomicBoolean(true);
try {

sampleSubscription = o.getSampleStream().observeOn(Schedulers.io()).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
LOGGER.error("HystrixSampleSseServlet: ({}) received unexpected OnCompleted from sample stream", getClass().getSimpleName());
moreDataWillBeSent.set(false);
}

@Override
public void onError(Throwable e) {
moreDataWillBeSent.set(false);
}

@Override
public void onNext(String sampleDataAsString) {
if (sampleDataAsString != null) {
try {
entity.write(("data: " + sampleDataAsString + "\n\n").getBytes());
entity.flush();
} catch (IOException ioe) {
moreDataWillBeSent.set(false);
}
}
}
});

while (moreDataWillBeSent.get()) {
try {
Thread.sleep(o.getPausePollerThreadDelayInMs());
} catch (InterruptedException e) {
moreDataWillBeSent.set(false);
}
}
} finally {
o.getConcurrentConnections().decrementAndGet();
if (sampleSubscription != null && !sampleSubscription.isUnsubscribed()) {
sampleSubscription.unsubscribe();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix.contrib.metrics.controller;

import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;

import com.netflix.hystrix.contrib.metrics.HystrixStream;
import com.netflix.hystrix.contrib.metrics.HystrixStreamingOutputProvider;

/**
* @author justinjose28
*
*/
public abstract class AbstractHystrixStreamController {
protected final Observable<String> sampleStream;

static final Logger logger = LoggerFactory.getLogger(AbstractHystrixStreamController.class);

// wake up occasionally and check that poller is still alive. this value controls how often
protected static final int DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS = 500;

private final int pausePollerThreadDelayInMs;

private static final AtomicInteger concurrentConnections = new AtomicInteger(0);

protected AbstractHystrixStreamController(Observable<String> sampleStream) {
this(sampleStream, DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS);
}

protected AbstractHystrixStreamController(Observable<String> sampleStream, int pausePollerThreadDelayInMs) {
this.sampleStream = sampleStream;
this.pausePollerThreadDelayInMs = pausePollerThreadDelayInMs;
}

protected abstract int getMaxNumberConcurrentConnectionsAllowed();

protected final AtomicInteger getCurrentConnections() {
return concurrentConnections;
}

/**
* Maintain an open connection with the client. On initial connection send latest data of each requested event type and subsequently send all changes for each requested event type.
*
* @return JAX-RS Response - Serialization will be handled by {@link HystrixStreamingOutputProvider}
*/
protected Response handleRequest() {
ResponseBuilder builder = null;
/* ensure we aren't allowing more connections than we want */
int numberConnections = getCurrentConnections().get();
int maxNumberConnectionsAllowed = getMaxNumberConcurrentConnectionsAllowed(); // may change at runtime, so look this up for each request
if (numberConnections >= maxNumberConnectionsAllowed) {
builder = Response.status(Status.SERVICE_UNAVAILABLE).entity("MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed);
} else {
/* initialize response */
builder = Response.status(Status.OK);
builder.header(HttpHeaders.CONTENT_TYPE, "text/event-stream;charset=UTF-8");
builder.header(HttpHeaders.CACHE_CONTROL, "no-cache, no-store, max-age=0, must-revalidate");
builder.header("Pragma", "no-cache");
getCurrentConnections().incrementAndGet();
builder.entity(new HystrixStream(sampleStream, pausePollerThreadDelayInMs, getCurrentConnections()));
}
return builder.build();

}

}
Loading

0 comments on commit e9d8276

Please sign in to comment.