Skip to content

Commit

Permalink
Added HystrixEventType.ThreadPool enum
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Jacobs committed Jan 12, 2016
1 parent d90dc53 commit 1712c74
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,30 @@ public static HystrixEventType from(HystrixRollingNumberEvent event) {
EXCEPTION_PRODUCING_EVENT_TYPES.add(FALLBACK_REJECTION);
}

public enum ThreadPool {
EXECUTED, REJECTED;

public static ThreadPool from(HystrixRollingNumberEvent event) {
switch (event) {
case THREAD_EXECUTION: return EXECUTED;
case THREAD_POOL_REJECTED: return REJECTED;
default:
throw new RuntimeException("Not an event that can be converted to HystrixEventType.ThreadPool : " + event);
}
}

public static ThreadPool from(HystrixEventType eventType) {
switch (eventType) {
case SUCCESS: return EXECUTED;
case FAILURE: return EXECUTED;
case TIMEOUT: return EXECUTED;
case BAD_REQUEST: return EXECUTED;
case THREAD_POOL_REJECTED: return REJECTED;
default: return null;
}
}
}

public enum Collapser {
BATCH_EXECUTED, ADDED_TO_BATCH, RESPONSE_FROM_CACHE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public class HystrixThreadPoolMetrics extends HystrixMetrics {
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(HystrixThreadPoolMetrics.class);

private static final HystrixEventType[] ALL_EVENT_TYPES = HystrixEventType.values();
private static final HystrixEventType[] ALL_COMMAND_EVENT_TYPES = HystrixEventType.values();
private static final HystrixEventType.ThreadPool[] ALL_THREADPOOL_EVENT_TYPES = HystrixEventType.ThreadPool.values();
private static final int NUMBER_THREADPOOL_EVENT_TYPES = ALL_THREADPOOL_EVENT_TYPES.length;

// String is HystrixThreadPoolKey.name() (we can't use HystrixThreadPoolKey directly as we can't guarantee it implements hashcode/equals correctly)
private static final ConcurrentHashMap<String, HystrixThreadPoolMetrics> metrics = new ConcurrentHashMap<String, HystrixThreadPoolMetrics>();
Expand Down Expand Up @@ -100,22 +102,11 @@ public static Collection<HystrixThreadPoolMetrics> getInstances() {
@Override
public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) {
ExecutionResult.EventCounts eventCounts = execution.getEventCounts();
for (HystrixEventType eventType: ALL_EVENT_TYPES) {
for (HystrixEventType eventType: ALL_COMMAND_EVENT_TYPES) {
long eventCount = eventCounts.getCount(eventType);
//the only executions that make it to this method are ones that executed in the given threadpool
//so we just count THREAD_POOL_REJECTED as rejected, and all other execution (not fallback) results as accepted
switch (eventType) {
case THREAD_POOL_REJECTED:
initialCountArray[1] += eventCount;
break;
//these all fall through on purpose (they have the same behavior)
case SUCCESS:
case FAILURE:
case TIMEOUT:
case BAD_REQUEST:
// SEMAPHORE_REJECTED can't happen
// SHORT_CIRCUITED implies the failure happened before the attempt to put work on the threadpool
initialCountArray[0] += eventCount;
HystrixEventType.ThreadPool threadPoolEventType = HystrixEventType.ThreadPool.from(eventType);
if (threadPoolEventType != null) {
initialCountArray[threadPoolEventType.ordinal()] += eventCount;
}
}
return initialCountArray;
Expand All @@ -125,7 +116,7 @@ public long[] call(long[] initialCountArray, HystrixCommandCompletion execution)
public static final Func2<long[], long[], long[]> counterAggregator = new Func2<long[], long[], long[]>() {
@Override
public long[] call(long[] cumulativeEvents, long[] bucketEventCounts) {
for (int i = 0; i < 2; i++) {
for (int i = 0; i < NUMBER_THREADPOOL_EVENT_TYPES; i++) {
cumulativeEvents[i] += bucketEventCounts[i];
}
return cumulativeEvents;
Expand Down Expand Up @@ -272,7 +263,7 @@ public void markThreadExecution() {
* @return rolling count of threads executed
*/
public long getRollingCountThreadsExecuted() {
return rollingCounterStream.getLatestExecutedCount();
return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED);
}

/**
Expand All @@ -281,7 +272,7 @@ public long getRollingCountThreadsExecuted() {
* @return cumulative count of threads executed
*/
public long getCumulativeCountThreadsExecuted() {
return cumulativeCounterStream.getLatestExecutedCount();
return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED);
}

/**
Expand All @@ -292,7 +283,7 @@ public long getCumulativeCountThreadsExecuted() {
* @return rolling count of threads rejected
*/
public long getRollingCountThreadsRejected() {
return rollingCounterStream.getLatestRejectedCount();
return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED);
}

/**
Expand All @@ -301,27 +292,17 @@ public long getRollingCountThreadsRejected() {
* @return cumulative count of threads rejected
*/
public long getCumulativeCountThreadsRejected() {
return cumulativeCounterStream.getLatestRejectedCount();
return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.REJECTED);
}

@Override
public long getCumulativeCount(HystrixRollingNumberEvent event) {
//only args that are valid are THREAD_EXECUTION and THREAD_POOL_REJECTION. delegate them appropriately and throw an exception for all others
switch (event) {
case THREAD_EXECUTION: return getCumulativeCountThreadsExecuted();
case THREAD_POOL_REJECTED: return getCumulativeCountThreadsRejected();
default: throw new RuntimeException("HystrixThreadPoolMetrics can not be queried for : " + event.name());
}
return cumulativeCounterStream.getLatestCount(HystrixEventType.ThreadPool.from(event));
}

@Override
public long getRollingCount(HystrixRollingNumberEvent event) {
//only args that are valid are THREAD_EXECUTION and THREAD_POOL_REJECTION. delegate them appropriately and throw an exception for all others
switch (event) {
case THREAD_EXECUTION: return getRollingCountThreadsExecuted();
case THREAD_POOL_REJECTED: return getRollingCountThreadsRejected();
default: throw new RuntimeException("HystrixThreadPoolMetrics can not be queried for : " + event.name());
}
return rollingCounterStream.getLatestCount(HystrixEventType.ThreadPool.from(event));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.hystrix.metric;

import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import rx.functions.Func2;
Expand All @@ -33,12 +34,14 @@
* These values are stable - there's no peeking into a bucket until it is emitted
*
* These values get produced and cached in this class.
* You may query to find the latest rolling count of 2 events (executed/rejected) via {@link #getLatestExecutedCount()} and {@link #getLatestRejectedCount()}.
* You may query to find the latest rolling count of 2 events (executed/rejected) via {@link #getLatestCount(com.netflix.hystrix.HystrixEventType.ThreadPool)}.
*/
public class CumulativeThreadPoolEventCounterStream extends BucketedCumulativeCounterStream<HystrixCommandCompletion, long[], long[]> {

private static final ConcurrentMap<String, CumulativeThreadPoolEventCounterStream> streams = new ConcurrentHashMap<String, CumulativeThreadPoolEventCounterStream>();

private static final int ALL_EVENT_TYPES_SIZE = HystrixEventType.ThreadPool.values().length;

public static CumulativeThreadPoolEventCounterStream getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties properties,
Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion,
Func2<long[], long[], long[]> reduceBucket) {
Expand Down Expand Up @@ -84,23 +87,15 @@ private CumulativeThreadPoolEventCounterStream(HystrixThreadPoolKey threadPoolKe

@Override
public long[] getEmptyBucketSummary() {
return new long[2];
return new long[ALL_EVENT_TYPES_SIZE];
}

@Override
public long[] getEmptyOutputValue() {
return new long[2];
}

public long getLatestExecutedCount() {
return getLatestCount(0);
}

public long getLatestRejectedCount() {
return getLatestCount(1);
return new long[ALL_EVENT_TYPES_SIZE];
}

private long getLatestCount(final int index) {
return getLatest()[index];
public long getLatestCount(HystrixEventType.ThreadPool eventType) {
return getLatest()[eventType.ordinal()];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import rx.functions.Func2;
Expand All @@ -35,12 +36,14 @@
* These values are stable - there's no peeking into a bucket until it is emitted
*
* These values get produced and cached in this class.
* You may query to find the latest rolling count of 2 events (executed/rejected) via {@link #getLatestExecutedCount()} and {@link #getLatestRejectedCount()}.
* You may query to find the latest rolling count of 2 events (executed/rejected) via {@link #getLatestCount(com.netflix.hystrix.HystrixEventType.ThreadPool)}.
*/
public class RollingThreadPoolEventCounterStream extends BucketedRollingCounterStream<HystrixCommandCompletion, long[], long[]> {

private static final ConcurrentMap<String, RollingThreadPoolEventCounterStream> streams = new ConcurrentHashMap<String, RollingThreadPoolEventCounterStream>();

private static final int ALL_EVENT_TYPES_SIZE = HystrixEventType.ThreadPool.values().length;

public static RollingThreadPoolEventCounterStream getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties properties,
Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion,
Func2<long[], long[], long[]> reduceBucket) {
Expand Down Expand Up @@ -85,23 +88,15 @@ private RollingThreadPoolEventCounterStream(HystrixThreadPoolKey threadPoolKey,

@Override
public long[] getEmptyBucketSummary() {
return new long[2];
return new long[ALL_EVENT_TYPES_SIZE];
}

@Override
public long[] getEmptyOutputValue() {
return new long[2];
}

public long getLatestExecutedCount() {
return getLatestCount(0);
}

public long getLatestRejectedCount() {
return getLatestCount(1);
return new long[ALL_EVENT_TYPES_SIZE];
}

private long getLatestCount(final int index) {
return getLatest()[index];
public long getLatestCount(HystrixEventType.ThreadPool eventType) {
return getLatest()[eventType.ordinal()];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public void testEmptyStreamProducesZeros() {
fail("Interrupted ex");
}
assertEquals(2, stream.getLatest().length);
assertEquals(0, stream.getLatestExecutedCount());
assertEquals(0, stream.getLatestRejectedCount());
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
}

@Test
Expand All @@ -98,8 +98,8 @@ public void testSingleSuccess() {
fail("Interrupted ex");
}
assertEquals(2, stream.getLatest().length);
assertEquals(1, stream.getLatestExecutedCount());
assertEquals(0, stream.getLatestRejectedCount());
assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
}

@Test
Expand All @@ -122,8 +122,8 @@ public void testSingleFailure() {
fail("Interrupted ex");
}
assertEquals(2, stream.getLatest().length);
assertEquals(1, stream.getLatestExecutedCount());
assertEquals(0, stream.getLatestRejectedCount());
assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
}

@Test
Expand All @@ -146,8 +146,8 @@ public void testSingleTimeout() {
fail("Interrupted ex");
}
assertEquals(2, stream.getLatest().length);
assertEquals(1, stream.getLatestExecutedCount());
assertEquals(0, stream.getLatestRejectedCount());
assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
}

@Test
Expand All @@ -170,8 +170,8 @@ public void testSingleBadRequest() {
fail("Interrupted ex");
}
assertEquals(2, stream.getLatest().length);
assertEquals(1, stream.getLatestExecutedCount());
assertEquals(0, stream.getLatestRejectedCount());
assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
}

@Test
Expand Down Expand Up @@ -202,8 +202,8 @@ public void testRequestFromCache() {

//RESPONSE_FROM_CACHE should not show up at all in thread pool counters - just the success
assertEquals(2, stream.getLatest().length);
assertEquals(1, stream.getLatestExecutedCount());
assertEquals(0, stream.getLatestRejectedCount());
assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
}

@Test
Expand Down Expand Up @@ -251,8 +251,8 @@ public void testShortCircuited() {

//only the FAILUREs should show up in thread pool counters
assertEquals(2, stream.getLatest().length);
assertEquals(3, stream.getLatestExecutedCount());
assertEquals(0, stream.getLatestRejectedCount());
assertEquals(3, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
}

@Test
Expand Down Expand Up @@ -308,8 +308,8 @@ public void run() {

//none of these got executed on a thread-pool, so thread pool metrics should be 0
assertEquals(2, stream.getLatest().length);
assertEquals(0, stream.getLatestExecutedCount());
assertEquals(0, stream.getLatestRejectedCount());
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
}

@Test
Expand Down Expand Up @@ -360,8 +360,8 @@ public void testThreadPoolRejected() {

//all 12 commands got submitted to thread pool, 10 accepted, 2 rejected is expected
assertEquals(2, stream.getLatest().length);
assertEquals(10, stream.getLatestExecutedCount());
assertEquals(2, stream.getLatestRejectedCount());
assertEquals(10, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
assertEquals(2, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
}

@Test
Expand All @@ -384,8 +384,8 @@ public void testFallbackFailure() {
fail("Interrupted ex");
}
assertEquals(2, stream.getLatest().length);
assertEquals(1, stream.getLatestExecutedCount());
assertEquals(0, stream.getLatestRejectedCount());
assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
}

@Test
Expand All @@ -408,8 +408,8 @@ public void testFallbackMissing() {
fail("Interrupted ex");
}
assertEquals(2, stream.getLatest().length);
assertEquals(1, stream.getLatestExecutedCount());
assertEquals(0, stream.getLatestRejectedCount());
assertEquals(1, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
}

@Test
Expand Down Expand Up @@ -454,8 +454,8 @@ public void testFallbackRejection() {

//all 7 commands executed on-thread, so should be executed according to thread-pool metrics
assertEquals(2, stream.getLatest().length);
assertEquals(7, stream.getLatestExecutedCount());
assertEquals(0, stream.getLatestRejectedCount());
assertEquals(7, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
}

//in a rolling window, take(30) would age out all counters. in the cumulative count, we expect them to remain non-zero forever
Expand Down Expand Up @@ -483,7 +483,7 @@ public void testMultipleEventsOverTimeGetStoredAndDoNotAgeOut() {

//all commands should have aged out
assertEquals(2, stream.getLatest().length);
assertEquals(2, stream.getLatestExecutedCount());
assertEquals(0, stream.getLatestRejectedCount());
assertEquals(2, stream.getLatestCount(HystrixEventType.ThreadPool.EXECUTED));
assertEquals(0, stream.getLatestCount(HystrixEventType.ThreadPool.REJECTED));
}
}
Loading

0 comments on commit 1712c74

Please sign in to comment.