Skip to content

Commit

Permalink
Made HystrixConfigurationStream support sharing
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Jacobs committed Jun 21, 2016
1 parent 910ee48 commit 8f304c8
Show file tree
Hide file tree
Showing 4 changed files with 415 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.netflix.hystrix.config.HystrixConfiguration;
import com.netflix.hystrix.config.HystrixConfigurationStream;
import com.netflix.hystrix.config.HystrixThreadPoolConfiguration;
import com.netflix.hystrix.metric.sample.HystrixUtilizationStream;
import rx.Observable;
import rx.functions.Func1;

Expand Down Expand Up @@ -171,11 +172,27 @@ public static String convertToString(HystrixConfiguration config) throws IOExcep
return jsonString.getBuffer().toString();
}

/**
* @deprecated Not for public use. This prevents stream-sharing. Please use {@link HystrixConfigurationStream#observe()}
* @param delay interval between data emissions
* @return sampled utilization as Java object, taken on a timer
*/
@Deprecated //deprecated in 1.5.4
public Observable<HystrixConfiguration> observe(int delay) {
return streamGenerator.call(delay);
}

/**
* @deprecated Not for public use. This prevents stream-sharing. Please use {@link #observeJson()}
* @param delay interval between data emissions
* @return sampled utilization as JSON string, taken on a timer
*/
@Deprecated //deprecated in 1.5.4
public Observable<String> observeJson(int delay) {
return streamGenerator.call(delay).map(convertToJson);
}

public Observable<String> observeJson() {
return HystrixConfigurationStream.getInstance().observe().map(convertToJson);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
import java.util.function.Supplier;

class EventStream implements Supplier<Observable<Payload>> {

private final static int CONFIGURATION_DATA_INTERVAL_IN_MS = 500;

private final Observable<Payload> source;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

Expand All @@ -56,7 +53,7 @@ public static EventStream getInstance(EventStreamEnum eventStreamEnum) {

switch (eventStreamEnum) {
case CONFIG_STREAM:
source = new HystrixConfigurationStream(CONFIGURATION_DATA_INTERVAL_IN_MS)
source = HystrixConfigurationStream.getInstance()
.observe()
.map(SerialHystrixConfiguration::toBytes)
.map(SerialHystrixMetric::toPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,51 +26,85 @@
import com.netflix.hystrix.HystrixThreadPoolMetrics;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import rx.Observable;
import rx.functions.Func0;
import rx.functions.Action0;
import rx.functions.Func1;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* This class samples current Hystrix configuration and exposes that as a stream
*/
public class HystrixConfigurationStream {

private final int intervalInMilliseconds;
private final Observable<Long> timer;

private final Observable<HystrixConfiguration> allConfigurationStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

/**
* @deprecated Not for public use. Please use {@link #getInstance()}. This facilitates better stream-sharing
* @param intervalInMilliseconds milliseconds between data emissions
*/
@Deprecated //deprecated in 1.5.4.
public HystrixConfigurationStream(final int intervalInMilliseconds) {
this.intervalInMilliseconds = intervalInMilliseconds;
this.timer = Observable.defer(new Func0<Observable<Long>>() {
@Override
public Observable<Long> call() {
return Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS);
}
});
this.allConfigurationStream = Observable.interval(intervalInMilliseconds, TimeUnit.MILLISECONDS)
.map(getAllConfig)
.doOnSubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
isSourceCurrentlySubscribed.set(false);
}
})
.share()
.onBackpressureDrop();
}

private static final HystrixConfigurationStream INSTANCE = new HystrixConfigurationStream(500);

public static HystrixConfigurationStream getInstance() {
return INSTANCE;
}

static HystrixConfigurationStream getNonSingletonInstanceOnlyUsedInUnitTests(int delayInMs) {
return new HystrixConfigurationStream(delayInMs);
}

/**
* Return a ref-counted stream that will only do work when at least one subscriber is present
*/
public Observable<HystrixConfiguration> observe() {
return timer.map(getAllConfig);
return allConfigurationStream;
}

public Observable<Map<HystrixCommandKey, HystrixCommandConfiguration>> observeCommandConfiguration() {
return timer.map(getAllCommandConfig);
return allConfigurationStream.map(getOnlyCommandConfig);
}

public Observable<Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> observeThreadPoolConfiguration() {
return timer.map(getAllThreadPoolConfig);
return allConfigurationStream.map(getOnlyThreadPoolConfig);
}

public Observable<Map<HystrixCollapserKey, HystrixCollapserConfiguration>> observeCollapserConfiguration() {
return timer.map(getAllCollapserConfig);
return allConfigurationStream.map(getOnlyCollapserConfig);
}

public int getIntervalInMilliseconds() {
return this.intervalInMilliseconds;
}

public boolean isSourceCurrentlySubscribed() {
return isSourceCurrentlySubscribed.get();
}

private static HystrixCommandConfiguration sampleCommandConfiguration(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey,
HystrixCommandGroupKey groupKey, HystrixCommandProperties commandProperties) {
return HystrixCommandConfiguration.sample(commandKey, threadPoolKey, groupKey, commandProperties);
Expand Down Expand Up @@ -136,4 +170,28 @@ public HystrixConfiguration call(Long timestamp) {
);
}
};

private static final Func1<HystrixConfiguration, Map<HystrixCommandKey, HystrixCommandConfiguration>> getOnlyCommandConfig =
new Func1<HystrixConfiguration, Map<HystrixCommandKey, HystrixCommandConfiguration>>() {
@Override
public Map<HystrixCommandKey, HystrixCommandConfiguration> call(HystrixConfiguration hystrixConfiguration) {
return hystrixConfiguration.getCommandConfig();
}
};

private static final Func1<HystrixConfiguration, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>> getOnlyThreadPoolConfig =
new Func1<HystrixConfiguration, Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration>>() {
@Override
public Map<HystrixThreadPoolKey, HystrixThreadPoolConfiguration> call(HystrixConfiguration hystrixConfiguration) {
return hystrixConfiguration.getThreadPoolConfig();
}
};

private static final Func1<HystrixConfiguration, Map<HystrixCollapserKey, HystrixCollapserConfiguration>> getOnlyCollapserConfig =
new Func1<HystrixConfiguration, Map<HystrixCollapserKey, HystrixCollapserConfiguration>>() {
@Override
public Map<HystrixCollapserKey, HystrixCollapserConfiguration> call(HystrixConfiguration hystrixConfiguration) {
return hystrixConfiguration.getCollapserConfig();
}
};
}
Loading

0 comments on commit 8f304c8

Please sign in to comment.