Skip to content

Commit

Permalink
Fixed max-concurrency streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Jacobs committed Jan 12, 2016
1 parent 0ca5067 commit f76263d
Show file tree
Hide file tree
Showing 27 changed files with 1,049 additions and 858 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.netflix.hystrix.metric.CumulativeCommandEventCounterStream;
import com.netflix.hystrix.metric.RollingCommandEventCounterStream;
import com.netflix.hystrix.metric.RollingCommandLatencyDistributionStream;
import com.netflix.hystrix.metric.RollingCommandMaxConcurrencyStream;
import com.netflix.hystrix.metric.RollingCommandUserLatencyDistributionStream;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherCommand;
import com.netflix.hystrix.util.HystrixRollingNumberEvent;
Expand Down Expand Up @@ -113,6 +114,7 @@ public void initialize() {
CumulativeCommandEventCounterStream.getInstance(key, properties).startCachingStreamValuesIfUnstarted();
RollingCommandLatencyDistributionStream.getInstance(key, properties).startCachingStreamValuesIfUnstarted();
RollingCommandUserLatencyDistributionStream.getInstance(key, properties).startCachingStreamValuesIfUnstarted();
RollingCommandMaxConcurrencyStream.getInstance(key, properties).startCachingStreamValuesIfUnstarted();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.hystrix.HystrixThreadPoolMetrics;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.metric.CumulativeThreadPoolEventCounterStream;
import com.netflix.hystrix.metric.RollingThreadPoolConcurrencyStream;
import com.netflix.hystrix.metric.RollingThreadPoolEventCounterStream;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherThreadPool;
import com.netflix.hystrix.util.HystrixRollingNumberEvent;
Expand Down Expand Up @@ -98,6 +99,7 @@ public void initialize() {
DefaultMonitorRegistry.getInstance().register(commandMetricsMonitor);
RollingThreadPoolEventCounterStream.getInstance(key, properties).startCachingStreamValuesIfUnstarted();
CumulativeThreadPoolEventCounterStream.getInstance(key, properties).startCachingStreamValuesIfUnstarted();
RollingThreadPoolConcurrencyStream.getInstance(key, properties).startCachingStreamValuesIfUnstarted();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ private Observable<R> getRunObservableDecoratedForMetricsAndErrorHandling() {
@Override
public void call(Subscriber<? super R> s) {
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.netflix.hystrix.metric.HealthCountsStream;
import com.netflix.hystrix.metric.HystrixCommandCompletion;
import com.netflix.hystrix.metric.HystrixThreadEventStream;
import com.netflix.hystrix.metric.RollingCommandConcurrencyStream;
import com.netflix.hystrix.metric.RollingCommandMaxConcurrencyStream;
import com.netflix.hystrix.metric.RollingCommandEventCounterStream;
import com.netflix.hystrix.metric.RollingCommandLatencyDistributionStream;
import com.netflix.hystrix.metric.RollingCommandUserLatencyDistributionStream;
Expand All @@ -34,6 +34,7 @@
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.util.HystrixRollingNumberEvent;
import rx.functions.Func0;
import rx.functions.Func2;

/**
Expand Down Expand Up @@ -169,8 +170,6 @@ public static Collection<HystrixCommandMetrics> getInstances() {
metrics.clear();
}



private final HystrixCommandProperties properties;
private final HystrixCommandKey key;
private final HystrixCommandGroupKey group;
Expand All @@ -182,7 +181,7 @@ public static Collection<HystrixCommandMetrics> getInstances() {
private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream;
private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream;
private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream;
private final RollingCommandConcurrencyStream rollingCommandConcurrencyStream;
private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream;

/* package */HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
super(null);
Expand All @@ -191,13 +190,20 @@ public static Collection<HystrixCommandMetrics> getInstances() {
this.threadPoolKey = threadPoolKey;
this.properties = properties;

Func0<Integer> concurrentExecutionThunk = new Func0<Integer>() {
@Override
public Integer call() {
return HystrixCommandMetrics.getInstance(key).concurrentExecutionCount.get();
}
};

healthCountsStream = HealthCountsStream.getInstance(key, properties);
rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);
cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);

rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);
rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);
rollingCommandConcurrencyStream = RollingCommandConcurrencyStream.getInstance(key, properties);
rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties);
}

/* package */ synchronized void resetStream() {
Expand Down Expand Up @@ -321,8 +327,7 @@ public int getTotalTimeMean() {
}

public long getRollingMaxConcurrentExecutions() {
return 0L;
//return rollingCommandConcurrencyStream.getLatestRollingMax();
return rollingCommandMaxConcurrencyStream.getLatestRollingMax();
}

/**
Expand All @@ -335,7 +340,8 @@ public int getCurrentConcurrentExecutionCount() {
}

/* package-private */ void markCommandStart(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) {
concurrentExecutionCount.incrementAndGet();
int currentCount = concurrentExecutionCount.incrementAndGet();
HystrixThreadEventStream.getInstance().commandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentCount);
}

/* package-private */ void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
Expand Down Expand Up @@ -377,7 +383,7 @@ private void unsubscribeAll() {
cumulativeCommandEventCounterStream.unsubscribe();
rollingCommandLatencyDistributionStream.unsubscribe();
rollingCommandUserLatencyDistributionStream.unsubscribe();
rollingCommandConcurrencyStream.unsubscribe();
rollingCommandMaxConcurrencyStream.unsubscribe();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,17 @@ private void touchConfig() {

@Override
public void markThreadExecution() {
metrics.markThreadExecution();
}

@Override
public void markThreadCompletion() {
metrics.markThreadCompletion();
}

@Override
public void markThreadRejection() {
metrics.markThreadRejection();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

import com.netflix.hystrix.metric.CumulativeThreadPoolEventCounterStream;
import com.netflix.hystrix.metric.HystrixCommandCompletion;
Expand All @@ -29,6 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.functions.Func0;
import rx.functions.Func2;

/**
Expand Down Expand Up @@ -136,6 +138,8 @@ public long[] call(long[] cumulativeEvents, long[] bucketEventCounts) {
private final ThreadPoolExecutor threadPool;
private final HystrixThreadPoolProperties properties;

private final AtomicInteger concurrentExecutionCount = new AtomicInteger();

private final RollingThreadPoolEventCounterStream rollingCounterStream;
private final CumulativeThreadPoolEventCounterStream cumulativeCounterStream;
private final RollingThreadPoolConcurrencyStream rollingThreadPoolConcurrencyStream;
Expand Down Expand Up @@ -254,6 +258,7 @@ public Number getCurrentQueueSize() {
* Invoked each time a thread is executed.
*/
public void markThreadExecution() {
concurrentExecutionCount.incrementAndGet();
}

/**
Expand Down Expand Up @@ -310,6 +315,7 @@ public long getRollingCount(HystrixRollingNumberEvent event) {
* Invoked each time a thread completes.
*/
public void markThreadCompletion() {
concurrentExecutionCount.decrementAndGet();
}

/**
Expand All @@ -320,13 +326,22 @@ public void markThreadCompletion() {
* @return rolling max active threads
*/
public long getRollingMaxActiveThreads() {
return 0L;
//return rollingThreadPoolConcurrencyStream.getRollingMax();
return rollingThreadPoolConcurrencyStream.getLatestRollingMax();
}

/**
* Invoked each time a command is rejected from the thread-pool
*/
public void markThreadRejection() {
concurrentExecutionCount.decrementAndGet();
}

public static Func0<Integer> getCurrentConcurrencyThunk(final HystrixThreadPoolKey threadPoolKey) {
return new Func0<Integer>() {
@Override
public Integer call() {
return HystrixThreadPoolMetrics.getInstance(threadPoolKey).concurrentExecutionCount.get();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static void reset() {
private CumulativeCommandEventCounterStream(HystrixCommandKey commandKey, int numCounterBuckets, int counterBucketSizeInMs,
Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion,
Func2<long[], long[], long[]> reduceBucket) {
super(HystrixCommandEventStream.getInstance(commandKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket);
super(HystrixCommandCompletionStream.getInstance(commandKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static void reset() {
private CumulativeThreadPoolEventCounterStream(HystrixThreadPoolKey threadPoolKey, int numCounterBuckets, int counterBucketSizeInMs,
Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion,
Func2<long[], long[], long[]> reduceBucket) {
super(HystrixThreadPoolEventStream.getInstance(threadPoolKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket);
super(HystrixThreadPoolCompletionStream.getInstance(threadPoolKey), numCounterBuckets, counterBucketSizeInMs, reduceCommandCompletion, reduceBucket);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static void removeByKey(HystrixCommandKey key) {

private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs,
Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) {
super(HystrixCommandEventStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator);
super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@
import java.util.concurrent.ConcurrentMap;

/**
* Per-Command stream of {@link HystrixCommandEvent}s. This gets written to by {@link HystrixThreadEventStream}s.
* Per-Command stream of {@link HystrixCommandCompletion}s. This gets written to by {@link HystrixThreadEventStream}s.
* That object will emit on an RxComputation thread, so all work done by a consumer of this {@link #observe()} happens
* asynchronously.
*/
public class HystrixCommandEventStream implements HystrixEventStream<HystrixCommandCompletion> {
public class HystrixCommandCompletionStream implements HystrixEventStream<HystrixCommandCompletion> {
private final HystrixCommandKey commandKey;

private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlySubject;
private final Observable<HystrixCommandCompletion> readOnlyStream;

private static final ConcurrentMap<String, HystrixCommandEventStream> streams = new ConcurrentHashMap<String, HystrixCommandEventStream>();
private static final ConcurrentMap<String, HystrixCommandCompletionStream> streams = new ConcurrentHashMap<String, HystrixCommandCompletionStream>();

public static HystrixCommandEventStream getInstance(HystrixCommandKey commandKey) {
HystrixCommandEventStream initialStream = streams.get(commandKey.name());
public static HystrixCommandCompletionStream getInstance(HystrixCommandKey commandKey) {
HystrixCommandCompletionStream initialStream = streams.get(commandKey.name());
if (initialStream != null) {
return initialStream;
} else {
synchronized (HystrixCommandEventStream.class) {
HystrixCommandEventStream existingStream = streams.get(commandKey.name());
synchronized (HystrixCommandCompletionStream.class) {
HystrixCommandCompletionStream existingStream = streams.get(commandKey.name());
if (existingStream == null) {
HystrixCommandEventStream newStream = new HystrixCommandEventStream(commandKey);
HystrixCommandCompletionStream newStream = new HystrixCommandCompletionStream(commandKey);
streams.putIfAbsent(commandKey.name(), newStream);
return newStream;
} else {
Expand All @@ -55,7 +55,7 @@ public static HystrixCommandEventStream getInstance(HystrixCommandKey commandKey
}
}

HystrixCommandEventStream(final HystrixCommandKey commandKey) {
HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {
this.commandKey = commandKey;

this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create());
Expand All @@ -78,6 +78,6 @@ public Observable<HystrixCommandCompletion> observe() {

@Override
public String toString() {
return "HystrixCommandEventStream(" + commandKey.name() + ")";
return "HystrixCommandCompletionStream(" + commandKey.name() + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@

/**
* Data class that get fed to event stream when a command starts executing.
* Was used in an experiment to get stream-based concurrency working, but not used as of 1.5.0-RC1
*/
/* package-private */class HystrixCommandExecutionStarted extends HystrixCommandEvent {
public class HystrixCommandExecutionStarted extends HystrixCommandEvent {
private final HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy;
private final int currentConcurrency;

public HystrixCommandExecutionStarted(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) {
public HystrixCommandExecutionStarted(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey,
HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy,
int currentConcurrency) {
super(commandKey, threadPoolKey);
this.isolationStrategy = isolationStrategy;
this.currentConcurrency = currentConcurrency;
}

@Override
Expand Down Expand Up @@ -56,4 +59,8 @@ public boolean didCommandExecute() {
return false;
}

public int getCurrentConcurrency() {
return currentConcurrency;
}

}
Loading

0 comments on commit f76263d

Please sign in to comment.