Skip to content

Commit

Permalink
Merge pull request Netflix#1211 from robertroeser/master
Browse files Browse the repository at this point in the history
ReactiveSocket metrics stream
  • Loading branch information
mattrjacobs committed May 26, 2016
2 parents 4e470d6 + 98f96f7 commit b0ce3b3
Show file tree
Hide file tree
Showing 17 changed files with 1,415 additions and 2 deletions.
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Wed Dec 02 15:47:21 PST 2015
#Thu May 19 16:56:49 PDT 2016
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.10-all.zip
24 changes: 24 additions & 0 deletions hystrix-contrib/hystrix-reactivesocket-event-stream/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
repositories {
mavenCentral()
jcenter()
maven { url 'https://dl.bintray.com/reactivesocket/ReactiveSocket' }
}

sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8

dependencies {
compile project(':hystrix-core')

compile 'io.reactivex:rxjava-reactive-streams:latest.release'

compile 'com.fasterxml.jackson.core:jackson-core:latest.release'
compile 'com.fasterxml.jackson.core:jackson-databind:latest.release'
compile 'com.fasterxml.jackson.core:jackson-annotations:latest.release'
compile 'com.fasterxml.jackson.module:jackson-module-afterburner:latest.release'
compile 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:latest.release'
compile 'io.reactivesocket:reactivesocket:latest.release'

testCompile 'junit:junit-dep:4.10'
testCompile 'org.mockito:mockito-all:1.9.5'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.netflix.hystrix.contrib.reactivesocket;

import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
import io.reactivesocket.Payload;
import rx.Observable;
import rx.subjects.BehaviorSubject;

import java.util.function.Supplier;

public abstract class BasePayloadSupplier implements Supplier<Observable<Payload>> {
protected final CBORFactory jsonFactory;

protected final BehaviorSubject<Payload> subject;

protected BasePayloadSupplier() {
this.jsonFactory = new CBORFactory();
this.subject = BehaviorSubject.create();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.netflix.hystrix.contrib.reactivesocket;


import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCollapserMetricsStream;
import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixCommandMetricsStream;
import com.netflix.hystrix.contrib.reactivesocket.metrics.HystrixThreadPoolMetricsStream;
import com.netflix.hystrix.contrib.reactivesocket.requests.HystrixRequestEventsStream;
import com.netflix.hystrix.contrib.reactivesocket.sample.HystrixConfigStream;
import com.netflix.hystrix.contrib.reactivesocket.sample.HystrixUtilizationStream;
import io.reactivesocket.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

import java.util.Arrays;
import java.util.function.Supplier;

public enum EventStreamEnum implements Supplier<Observable<Payload>> {

CONFIG_STREAM(1) {
@Override
public Observable<Payload> get() {
logger.info("streaming config data");
return HystrixConfigStream.getInstance().get();
}
},
REQUEST_EVENT_STREAM(2) {
@Override
public Observable<Payload> get() {
logger.info("streaming request events");
return HystrixRequestEventsStream.getInstance().get();
}
},
UTILIZATION_EVENT_STREAM(3) {
@Override
public Observable<Payload> get() {
logger.info("streaming utilization events");
return HystrixUtilizationStream.getInstance().get();
}
},
METRICS_STREAM(4) {
@Override
public Observable<Payload> get() {
logger.info("streaming metrics");
return Observable.merge(
HystrixCommandMetricsStream.getInstance().get(),
HystrixThreadPoolMetricsStream.getInstance().get(),
HystrixCollapserMetricsStream.getInstance().get());
}
}

;

private static final Logger logger = LoggerFactory.getLogger(EventStreamEnum.class);

private int typeId;

EventStreamEnum(int typeId) {
this.typeId = typeId;
}

public static EventStreamEnum findByTypeId(int typeId) {
return Arrays
.asList(EventStreamEnum.values())
.stream()
.filter(t -> t.typeId == typeId)
.findAny()
.orElseThrow(() -> new IllegalStateException("no type id found for id => " + typeId));
}

public int getTypeId() {
return typeId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.netflix.hystrix.contrib.reactivesocket;

import io.reactivesocket.Payload;
import io.reactivesocket.RequestHandler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.RxReactiveStreams;

/**
* An implementation of {@link RequestHandler} that provides a Hystrix Stream. Takes an 32-bit integer in the {@link Payload}
* data of a ReactiveSocket {@link io.reactivesocket.Frame} which corresponds to an id in {@link EventStreamEnum}. If
* the id is found it will begin to stream the events to the subscriber.
*/
public class EventStreamRequestHandler extends RequestHandler {
private static final Logger logger = LoggerFactory.getLogger(EventStreamRequestHandler.class);

@Override
public Publisher<Payload> handleRequestResponse(Payload payload) {
return NO_REQUEST_RESPONSE_HANDLER.apply(payload);
}

@Override
public Publisher<Payload> handleRequestStream(Payload payload) {
return NO_REQUEST_STREAM_HANDLER.apply(payload);
}

@Override
public Publisher<Payload> handleSubscription(Payload payload) {
Observable<Payload> defer = Observable
.defer(() -> {
try {
int typeId = payload
.getData()
.getInt(0);

EventStreamEnum eventStreamEnum = EventStreamEnum.findByTypeId(typeId);
return eventStreamEnum
.get();
} catch (Throwable t) {
logger.error(t.getMessage(), t);
return Observable.error(t);
}
})
.onBackpressureDrop();

return RxReactiveStreams
.toPublisher(defer);
}

@Override
public Publisher<Void> handleFireAndForget(Payload payload) {
return NO_FIRE_AND_FORGET_HANDLER.apply(payload);
}

@Override
public Publisher<Payload> handleChannel(Payload initialPayload, Publisher<Payload> inputs) {
return NO_REQUEST_CHANNEL_HANDLER.apply(inputs);
}

@Override
public Publisher<Void> handleMetadataPush(Payload payload) {
return NO_METADATA_PUSH_HANDLER.apply(payload);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.netflix.hystrix.contrib.reactivesocket;

import com.fasterxml.jackson.core.JsonGenerator;
import io.reactivesocket.Frame;
import io.reactivesocket.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func0;
import rx.schedulers.Schedulers;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public abstract class StreamingSupplier<T> extends BasePayloadSupplier {

protected Logger logger = LoggerFactory.getLogger(StreamingSupplier.class);

protected StreamingSupplier() {

Observable
.interval(500, TimeUnit.MILLISECONDS, Schedulers.computation())
.doOnNext(i ->
getStream()
.filter(this::filter)
.map(this::getPayloadData)
.forEach(b -> {
Payload p = new Payload() {
@Override
public ByteBuffer getData() {
return ByteBuffer.wrap(b);
}

@Override
public ByteBuffer getMetadata() {
return Frame.NULL_BYTEBUFFER;
}
};

subject.onNext(p);
})
)
.retry()
.subscribe();
}

public boolean filter(T t) {
return true;
}

@Override
public Observable<Payload> get() {
return subject;
}

protected abstract Stream<T> getStream();

protected abstract byte[] getPayloadData(T t);

protected void safelyWriteNumberField(JsonGenerator json, String name, Func0<Long> metricGenerator) throws IOException {
try {
json.writeNumberField(name, metricGenerator.call());
} catch (NoSuchFieldError error) {
logger.error("While publishing Hystrix metrics stream, error looking up eventType for : " + name + ". Please check that all Hystrix versions are the same!");
json.writeNumberField(name, 0L);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.netflix.hystrix.contrib.reactivesocket.metrics;


import com.fasterxml.jackson.core.JsonGenerator;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCollapserMetrics;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.contrib.reactivesocket.StreamingSupplier;
import org.agrona.LangUtil;
import rx.functions.Func0;

import java.io.ByteArrayOutputStream;
import java.util.stream.Stream;

public class HystrixCollapserMetricsStream extends StreamingSupplier<HystrixCollapserMetrics> {
private static HystrixCollapserMetricsStream INSTANCE = new HystrixCollapserMetricsStream();

private HystrixCollapserMetricsStream() {
super();
}

public static HystrixCollapserMetricsStream getInstance() {
return INSTANCE;
}

@Override
protected Stream getStream() {
return HystrixCollapserMetrics.getInstances().stream();
}

protected byte[] getPayloadData(final HystrixCollapserMetrics collapserMetrics) {
byte[] retVal = null;
try {
HystrixCollapserKey key = collapserMetrics.getCollapserKey();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
JsonGenerator json = jsonFactory.createGenerator(bos);
json.writeStartObject();

json.writeStringField("type", "HystrixCollapser");
json.writeStringField("name", key.name());
json.writeNumberField("currentTime", System.currentTimeMillis());

safelyWriteNumberField(json, "rollingCountRequestsBatched", new Func0<Long>() {
@Override
public Long call() {
return collapserMetrics.getRollingCount(HystrixEventType.Collapser.ADDED_TO_BATCH);
}
});
safelyWriteNumberField(json, "rollingCountBatches", new Func0<Long>() {
@Override
public Long call() {
return collapserMetrics.getRollingCount(HystrixEventType.Collapser.BATCH_EXECUTED);
}
});
safelyWriteNumberField(json, "rollingCountResponsesFromCache", new Func0<Long>() {
@Override
public Long call() {
return collapserMetrics.getRollingCount(HystrixEventType.Collapser.RESPONSE_FROM_CACHE);
}
});

// batch size percentiles
json.writeNumberField("batchSize_mean", collapserMetrics.getBatchSizeMean());
json.writeObjectFieldStart("batchSize");
json.writeNumberField("25", collapserMetrics.getBatchSizePercentile(25));
json.writeNumberField("50", collapserMetrics.getBatchSizePercentile(50));
json.writeNumberField("75", collapserMetrics.getBatchSizePercentile(75));
json.writeNumberField("90", collapserMetrics.getBatchSizePercentile(90));
json.writeNumberField("95", collapserMetrics.getBatchSizePercentile(95));
json.writeNumberField("99", collapserMetrics.getBatchSizePercentile(99));
json.writeNumberField("99.5", collapserMetrics.getBatchSizePercentile(99.5));
json.writeNumberField("100", collapserMetrics.getBatchSizePercentile(100));
json.writeEndObject();

// shard size percentiles (commented-out for now)
//json.writeNumberField("shardSize_mean", collapserMetrics.getShardSizeMean());
//json.writeObjectFieldStart("shardSize");
//json.writeNumberField("25", collapserMetrics.getShardSizePercentile(25));
//json.writeNumberField("50", collapserMetrics.getShardSizePercentile(50));
//json.writeNumberField("75", collapserMetrics.getShardSizePercentile(75));
//json.writeNumberField("90", collapserMetrics.getShardSizePercentile(90));
//json.writeNumberField("95", collapserMetrics.getShardSizePercentile(95));
//json.writeNumberField("99", collapserMetrics.getShardSizePercentile(99));
//json.writeNumberField("99.5", collapserMetrics.getShardSizePercentile(99.5));
//json.writeNumberField("100", collapserMetrics.getShardSizePercentile(100));
//json.writeEndObject();

//json.writeNumberField("propertyValue_metricsRollingStatisticalWindowInMilliseconds", collapserMetrics.getProperties().metricsRollingStatisticalWindowInMilliseconds().get());
json.writeBooleanField("propertyValue_requestCacheEnabled", collapserMetrics.getProperties().requestCacheEnabled().get());
json.writeNumberField("propertyValue_maxRequestsInBatch", collapserMetrics.getProperties().maxRequestsInBatch().get());
json.writeNumberField("propertyValue_timerDelayInMilliseconds", collapserMetrics.getProperties().timerDelayInMilliseconds().get());

json.writeNumberField("reportingHosts", 1); // this will get summed across all instances in a cluster

json.writeEndObject();
json.close();

retVal = bos.toByteArray();
} catch (Exception e) {
LangUtil.rethrowUnchecked(e);
}

return retVal;
}

}
Loading

0 comments on commit b0ce3b3

Please sign in to comment.