Skip to content

Commit

Permalink
Merge pull request Netflix#238 from benjchristensen/semaphore-vs-thre…
Browse files Browse the repository at this point in the history
…ad-isolation

Fix for Semaphore vs Thread Isolation Bug
  • Loading branch information
benjchristensen committed Apr 1, 2014
2 parents ce69ad5 + 91c3802 commit 041c2f9
Show file tree
Hide file tree
Showing 4 changed files with 389 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ protected TryableSemaphore getFallbackSemaphore() {
TryableSemaphore _s = fallbackSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
// we didn't find one cache so setup
fallbackSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphore(properties.fallbackIsolationSemaphoreMaxConcurrentRequests()));
fallbackSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.fallbackIsolationSemaphoreMaxConcurrentRequests()));
// assign whatever got set (this or another thread)
return fallbackSemaphorePerCircuit.get(commandKey.name());
} else {
Expand All @@ -612,18 +612,23 @@ protected TryableSemaphore getFallbackSemaphore() {
* @return TryableSemaphore
*/
protected TryableSemaphore getExecutionSemaphore() {
if (executionSemaphoreOverride == null) {
TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
// we didn't find one cache so setup
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphore(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
// assign whatever got set (this or another thread)
return executionSemaphorePerCircuit.get(commandKey.name());
if (properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.SEMAPHORE)) {
if (executionSemaphoreOverride == null) {
TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
// we didn't find one cache so setup
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
// assign whatever got set (this or another thread)
return executionSemaphorePerCircuit.get(commandKey.name());
} else {
return _s;
}
} else {
return _s;
return executionSemaphoreOverride;
}
} else {
return executionSemaphoreOverride;
// return NoOp implementation since we're not using SEMAPHORE isolation
return TryableSemaphoreNoOp.DEFAULT;
}
}

Expand Down Expand Up @@ -859,14 +864,60 @@ protected RuntimeException decomposeException(Exception e) {
* Using AtomicInteger increment/decrement instead of java.util.concurrent.Semaphore since we don't need blocking and need a custom implementation to get the dynamic permit count and since
* AtomicInteger achieves the same behavior and performance without the more complex implementation of the actual Semaphore class using AbstractQueueSynchronizer.
*/
/* package */static class TryableSemaphore {
/* package */static class TryableSemaphoreActual implements TryableSemaphore {
protected final HystrixProperty<Integer> numberOfPermits;
private final AtomicInteger count = new AtomicInteger(0);

public TryableSemaphore(HystrixProperty<Integer> numberOfPermits) {
public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
this.numberOfPermits = numberOfPermits;
}

@Override
public boolean tryAcquire() {
int currentCount = count.incrementAndGet();
if (currentCount > numberOfPermits.get()) {
count.decrementAndGet();
return false;
} else {
return true;
}
}

@Override
public void release() {
count.decrementAndGet();
}

@Override
public int getNumberOfPermitsUsed() {
return count.get();
}

}

/* package */static class TryableSemaphoreNoOp implements TryableSemaphore {

public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();

@Override
public boolean tryAcquire() {
return true;
}

@Override
public void release() {

}

@Override
public int getNumberOfPermitsUsed() {
return 0;
}

}

/* package */static interface TryableSemaphore {

/**
* Use like this:
* <p>
Expand All @@ -883,15 +934,7 @@ public TryableSemaphore(HystrixProperty<Integer> numberOfPermits) {
*
* @return boolean
*/
public boolean tryAcquire() {
int currentCount = count.incrementAndGet();
if (currentCount > numberOfPermits.get()) {
count.decrementAndGet();
return false;
} else {
return true;
}
}
public abstract boolean tryAcquire();

/**
* ONLY call release if tryAcquire returned true.
Expand All @@ -907,13 +950,9 @@ public boolean tryAcquire() {
* }
* </pre>
*/
public void release() {
count.decrementAndGet();
}
public abstract void release();

public int getNumberOfPermitsUsed() {
return count.get();
}
public abstract int getNumberOfPermitsUsed();

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,7 @@ protected Observable<R> getFallback() {
/**
* A lazy {@link Observable} that will execute the command when subscribed to.
* <p>
* <b>Callback Scheduling</b>
* <p>
* <ul>
* <li>When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.</li>
* <li>When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.</li>
* <b>This defaults to using {@link Schedulers#immediate()} for callbacks.</li>
* </ul>
* <p>
* See https://github.com/Netflix/RxJava/wiki for more information.
Expand All @@ -263,7 +259,7 @@ protected Observable<R> getFallback() {
* if invoked more than once
*/
public Observable<R> toObservable() {
return toObservable(Schedulers.computation());
return toObservable(Schedulers.immediate());
}

protected ObservableCommand<R> toObservable(final Scheduler observeOn, boolean performAsyncTimeout) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package com.netflix.hystrix;

import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -31,6 +37,7 @@
import com.netflix.hystrix.HystrixCircuitBreakerTest.TestCircuitBreaker;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.HystrixExecutableBase.TryableSemaphore;
import com.netflix.hystrix.HystrixExecutableBase.TryableSemaphoreActual;
import com.netflix.hystrix.exception.HystrixBadRequestException;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.exception.HystrixRuntimeException.FailureType;
Expand Down Expand Up @@ -1908,7 +1915,7 @@ public void testExecutionSemaphoreWithQueue() {
final AtomicBoolean exceptionReceived = new AtomicBoolean();

final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));
new TryableSemaphoreActual(HystrixProperty.Factory.asProperty(1));

Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

Expand Down Expand Up @@ -1980,7 +1987,7 @@ public void testExecutionSemaphoreWithExecution() {
final AtomicBoolean exceptionReceived = new AtomicBoolean();

final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));
new TryableSemaphoreActual(HystrixProperty.Factory.asProperty(1));

Runnable r = new HystrixContextRunnable(HystrixPlugins.getInstance().getConcurrencyStrategy(), new Runnable() {

Expand Down Expand Up @@ -2109,8 +2116,8 @@ public void testSemaphorePermitsInUse() {
final TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();

// this semaphore will be shared across multiple command instances
final TryableSemaphore sharedSemaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(3));
final TryableSemaphoreActual sharedSemaphore =
new TryableSemaphoreActual(HystrixProperty.Factory.asProperty(3));

// used to wait until all commands have started
final CountDownLatch startLatch = new CountDownLatch(sharedSemaphore.numberOfPermits.get() + 1);
Expand Down Expand Up @@ -2138,8 +2145,8 @@ public void run() {
}

// creates thread using isolated semaphore
final TryableSemaphore isolatedSemaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(1));
final TryableSemaphoreActual isolatedSemaphore =
new TryableSemaphoreActual(HystrixProperty.Factory.asProperty(1));

final CountDownLatch isolatedLatch = new CountDownLatch(1);

Expand Down Expand Up @@ -4112,8 +4119,8 @@ public void testExecutionHookSuccessfulCommandWithSemaphoreIsolation() {
@Test
public void testExecutionHookFailureWithSemaphoreIsolation() {
/* test with execute() */
final TryableSemaphore semaphore =
new TryableSemaphore(HystrixProperty.Factory.asProperty(0));
final TryableSemaphoreActual semaphore =
new TryableSemaphoreActual(HystrixProperty.Factory.asProperty(0));

TestSemaphoreCommand command = new TestSemaphoreCommand(new TestCircuitBreaker(), semaphore, 200);
try {
Expand Down
Loading

0 comments on commit 041c2f9

Please sign in to comment.