Skip to content

Commit

Permalink
Merge pull request Netflix#1044 from mattrjacobs/jmh-command-execution
Browse files Browse the repository at this point in the history
Better measurement of command execution
  • Loading branch information
mattrjacobs committed Jan 12, 2016
2 parents 4816163 + d1a18eb commit 944cf45
Showing 1 changed file with 104 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -18,9 +18,11 @@
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixObservableCommand;
import com.netflix.hystrix.HystrixThreadPool;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
@@ -33,6 +35,7 @@
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.infra.Blackhole;
import rx.Observable;
import rx.functions.Func0;
import rx.schedulers.Schedulers;

import java.util.List;
@@ -42,6 +45,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Note that the hystrixExecute test must be run on a forked JVM. Otherwise, the initial properties that get
* set for the command apply to all runs. This would leave the command as THREAD-isolated in all test, for example.
*/
public class CommandExecutionPerfTest {

static HystrixCommandProperties.Setter threadIsolatedCommandDefaults = HystrixCommandProperties.Setter()
@@ -61,25 +68,43 @@ public class CommandExecutionPerfTest {
static HystrixThreadPoolProperties.Setter threadPoolDefaults = HystrixThreadPoolProperties.Setter()
.withCoreSize(100);

static HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("PERF");

private static HystrixCommandProperties.Setter getCommandSetter(HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) {
switch (isolationStrategy) {
case THREAD: return threadIsolatedCommandDefaults;
default: return semaphoreIsolatedCommandDefaults;
}
}

@State(Scope.Thread)
public static class BlackholeState {
//amount of "work" to give to CPU
@Param({"1", "100", "10000"})
public int blackholeConsumption;
}

@State(Scope.Thread)
public static class CommandState {
HystrixCommand<Integer> command;
HystrixRequestContext requestContext;

@Param({"true", "false"})
public boolean setUpRequestContext;

@Param({"THREAD", "SEMAPHORE"})
public HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy;

//amount of "work" to give to CPU
@Param({"1", "100", "10000"})
public int blackholeConsumption;

@Setup(Level.Invocation)
public void setUp() {
if (setUpRequestContext) {
requestContext = HystrixRequestContext.initializeContext();
}

command = new HystrixCommand<Integer>(
HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("PERF"))
.andCommandPropertiesDefaults(getCommandSetter(isolationStrategy))
@@ -97,6 +122,56 @@ protected Integer getFallback() {
}
};
}

@TearDown(Level.Invocation)
public void tearDown() {
if (setUpRequestContext) {
requestContext.shutdown();
}
}
}

@State(Scope.Thread)
public static class ObservableCommandState {
HystrixObservableCommand<Integer> command;
HystrixRequestContext requestContext;

@Param({"true", "false"})
public boolean setUpRequestContext;

//amount of "work" to give to CPU
@Param({"1", "100", "10000"})
public int blackholeConsumption;

@Setup(Level.Invocation)
public void setUp() {
if (setUpRequestContext) {
requestContext = HystrixRequestContext.initializeContext();
}

command = new HystrixObservableCommand<Integer>(
HystrixObservableCommand.Setter.withGroupKey(groupKey)
.andCommandPropertiesDefaults(getCommandSetter(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE))
) {
@Override
protected Observable<Integer> construct() {
return Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
Blackhole.consumeCPU(blackholeConsumption);
return Observable.just(1);
}
});
}
};
}

@TearDown(Level.Invocation)
public void tearDown() {
if (setUpRequestContext) {
requestContext.shutdown();
}
}
}

@State(Scope.Benchmark)
@@ -131,22 +206,23 @@ public void tearDown() {
}
}


@Benchmark
@BenchmarkMode({Mode.Throughput})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public Integer baselineExecute() {
public Integer baselineExecute(BlackholeState bhState) {
Blackhole.consumeCPU(bhState.blackholeConsumption);
return 1;
}

@Benchmark
@BenchmarkMode({Mode.Throughput})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public Integer baselineQueue(ExecutorState state) throws InterruptedException, ExecutionException{
public Integer baselineQueue(ExecutorState state, final BlackholeState bhState) throws InterruptedException, ExecutionException {
try {
return state.executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Blackhole.consumeCPU(bhState.blackholeConsumption);
return 1;
}
}).get();
@@ -158,8 +234,14 @@ public Integer call() throws Exception {
@Benchmark
@BenchmarkMode({Mode.Throughput})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public Integer baselineSyncObserve() throws InterruptedException {
Observable<Integer> syncObservable = Observable.just(1);
public Integer baselineSyncObserve(final BlackholeState bhState) throws InterruptedException {
Observable<Integer> syncObservable = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
Blackhole.consumeCPU(bhState.blackholeConsumption);
return Observable.just(1);
}
});

try {
return syncObservable.toBlocking().first();
@@ -171,8 +253,14 @@ public Integer baselineSyncObserve() throws InterruptedException {
@Benchmark
@BenchmarkMode({Mode.Throughput})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public Integer baselineAsyncComputationObserve() throws InterruptedException {
Observable<Integer> asyncObservable = Observable.just(1).subscribeOn(Schedulers.computation());
public Integer baselineAsyncComputationObserve(final BlackholeState bhState) throws InterruptedException {
Observable<Integer> asyncObservable = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
Blackhole.consumeCPU(bhState.blackholeConsumption);
return Observable.just(1);
}
}).subscribeOn(Schedulers.computation());

try {
return asyncObservable.toBlocking().first();
@@ -184,8 +272,14 @@ public Integer baselineAsyncComputationObserve() throws InterruptedException {
@Benchmark
@BenchmarkMode({Mode.Throughput})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public Integer baselineAsyncCustomThreadPoolObserve(ThreadPoolState state) {
Observable<Integer> asyncObservable = Observable.just(1).subscribeOn(state.hystrixThreadPool.getScheduler());
public Integer baselineAsyncCustomThreadPoolObserve(ThreadPoolState state, final BlackholeState bhState) {
Observable<Integer> asyncObservable = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
Blackhole.consumeCPU(bhState.blackholeConsumption);
return Observable.just(1);
}
}).subscribeOn(state.hystrixThreadPool.getScheduler());
try {
return asyncObservable.toBlocking().first();
} catch (Throwable t) {
@@ -203,25 +297,7 @@ public Integer hystrixExecute(CommandState state) {
@Benchmark
@BenchmarkMode({Mode.Throughput})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public Integer hystrixQueue(CommandState state) {
try {
return state.command.queue().get();
} catch (Throwable t) {
return 2;
}
}

@Benchmark
@BenchmarkMode({Mode.Throughput})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public Integer hystrixObserve(CommandState state) {
public Integer hystrixObserve(ObservableCommandState state) {
return state.command.observe().toBlocking().first();
}

@Benchmark
@BenchmarkMode({Mode.Throughput})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public Integer hystrixToObservable(CommandState state) {
return state.command.toObservable().toBlocking().first();
}
}

0 comments on commit 944cf45

Please sign in to comment.