Skip to content

Commit

Permalink
Merge pull request Netflix#990 from mattrjacobs/timeout-concurrency-test
Browse files Browse the repository at this point in the history
Made commands run concurrently in HystrixCommandTimeoutConcurrencyTesting
  • Loading branch information
mattrjacobs committed Nov 20, 2015
2 parents 3a66c1f + c5d0768 commit 2403585
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,8 @@ protected boolean shouldOutputOnNextEvents() {
return false;
}

private static final HystrixTimeoutException TIMEOUT_EXCEPTION_INSTANCE = new HystrixTimeoutException();

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

final AbstractCommand<R> originalCommand;
Expand All @@ -948,7 +950,7 @@ public Subscriber<? super R> call(final Subscriber<? super R> child) {

@Override
public void run() {
child.onError(new HystrixTimeoutException());
child.onError(TIMEOUT_EXCEPTION_INSTANCE);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,45 @@
import org.junit.Test;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import rx.Observable;

import java.util.ArrayList;
import java.util.List;

public class HystrixCommandTimeoutConcurrencyTesting {

private final static int NUM_CONCURRENT_COMMANDS = 30;

@Test
public void testTimeoutRace() {
for (int i = 0; i < 2000; i++) {
String a = null;
String b = null;
public void testTimeoutRace() throws InterruptedException {
final int NUM_TRIALS = 1000;

for (int i = 0; i < NUM_TRIALS; i++) {
List<Observable<String>> observables = new ArrayList<Observable<String>>();
HystrixRequestContext context = null;

try {
HystrixRequestContext.initializeContext();
a = new TestCommand().execute();
b = new TestCommand().execute();
if (a == null || b == null) {
System.err.println("Received NULL!");
throw new RuntimeException("Received NULL");
context = HystrixRequestContext.initializeContext();
for (int j = 0; j < NUM_CONCURRENT_COMMANDS; j++) {
observables.add(new TestCommand().observe());
}

Observable<String> overall = Observable.merge(observables);

List<String> results = overall.toList().toBlocking().first(); //wait for all commands to complete

for (String s : results) {
if (s == null) {
System.err.println("Received NULL!");
throw new RuntimeException("Received NULL");
}
}

for (HystrixInvokableInfo<?> hi : HystrixRequestLog.getCurrentRequest().getAllExecutedCommands()) {
if (!hi.isResponseTimedOut()) {
System.err.println("Timeout not found in executed command");
throw new RuntimeException("Timeout not found in executed command");
}
if (hi.isResponseTimedOut() && hi.getExecutionEvents().size() == 1) {
System.err.println("Missing fallback status!");
throw new RuntimeException("Missing fallback status on timeout.");
Expand All @@ -46,9 +67,15 @@ public void testTimeoutRace() {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
System.out.println(a + " " + b + " ==> " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
HystrixRequestContext.getContextForCurrentThread().shutdown();
System.out.println(HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
if (context != null) {
context.shutdown();
}
}

System.out.println("*************** TRIAL " + i + " ******************");
System.out.println();
Thread.sleep(50);
}

Hystrix.reset();
Expand All @@ -60,13 +87,20 @@ protected TestCommand() {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("testTimeoutConcurrency"))
.andCommandKey(HystrixCommandKey.Factory.asKey("testTimeoutConcurrencyCommand"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(1)));
.withExecutionTimeoutInMilliseconds(3)
.withCircuitBreakerEnabled(false)
.withFallbackIsolationSemaphoreMaxConcurrentRequests(NUM_CONCURRENT_COMMANDS))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
.withCoreSize(NUM_CONCURRENT_COMMANDS)
.withMaxQueueSize(NUM_CONCURRENT_COMMANDS)
.withQueueSizeRejectionThreshold(NUM_CONCURRENT_COMMANDS)));
}

@Override
protected String run() throws Exception {
// throw new RuntimeException("test");
// Thread.sleep(5);
//System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " sleeping");
Thread.sleep(100);
//System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " awake and returning");
return "hello";
}

Expand Down

0 comments on commit 2403585

Please sign in to comment.