Skip to content

Commit

Permalink
Merge pull request Netflix#1209 from mattrjacobs/thread-accounting
Browse files Browse the repository at this point in the history
Fixed thread-state cleanup to happen on unsubscribe or terminate
  • Loading branch information
mattrjacobs committed May 19, 2016
2 parents e547495 + eded2af commit 7d3afbf
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 202 deletions.
219 changes: 102 additions & 117 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ protected static enum TimedOutStatus {

protected final AtomicBoolean commandStarted = new AtomicBoolean();
protected volatile boolean executionStarted = false;
protected volatile boolean threadExecutionStarted = false;
protected volatile boolean isExecutionComplete = false;

/*
Expand Down Expand Up @@ -377,7 +378,7 @@ public Observable<R> toObservable() {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache);
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}

Expand All @@ -392,7 +393,7 @@ public Observable<R> toObservable() {
public void call() {
if (commandCleanupExecuted.compareAndSet(false, true)) {
isExecutionComplete = true;
handleCommandEnd();
handleCommandEnd(_cmd);
}
}
};
Expand All @@ -405,7 +406,7 @@ public void call() {
eventNotifier.markEvent(HystrixEventType.CANCELLED, commandKey);
executionResultAtTimeOfCancellation = executionResult
.addEvent((int) (System.currentTimeMillis() - commandStartTimestamp), HystrixEventType.CANCELLED);
handleCommandEnd();
handleCommandEnd(_cmd);
}
}
};
Expand All @@ -417,9 +418,48 @@ public Observable<R> call() {
}
};

final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
@Override
public R call(R r) {
R afterFirstApplication = r;

try {
afterFirstApplication = executionHook.onComplete(_cmd, r);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
}

try {
return executionHook.onEmit(_cmd, afterFirstApplication);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
return afterFirstApplication;
}
}
};

final Action0 fireOnCompletedHook = new Action0() {
@Override
public void call() {
try {
executionHook.onSuccess(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
}
}
};

final Action1<Throwable> fireOnErrorHook = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {

}
};

Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics).
lift(new CommandHookApplication(this));
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);


Observable<R> afterCache;

Expand All @@ -432,7 +472,7 @@ public Observable<R> call() {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache);
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
Expand All @@ -442,8 +482,10 @@ public Observable<R> call() {
}

return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup); // perform cleanup once
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook)
.doOnError(fireOnErrorHook);
}

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
Expand Down Expand Up @@ -551,15 +593,6 @@ public void call(Notification<? super R> rNotification) {
}
};

final Action0 handleThreadEndOnNonTimeout = new Action0() {
@Override
public void call() {
if (!isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd();
}
}
};

Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
Expand All @@ -571,9 +604,7 @@ public void call() {
return execution.doOnNext(markEmits)
.doOnCompleted(markCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext)
.doOnTerminate(handleThreadEndOnNonTimeout)
.lift(new DeprecatedOnCompleteWithValueHookApplication(_cmd));
.doOnEach(setRequestContext);
}

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
Expand All @@ -592,6 +623,7 @@ public Observable<R> call() {
return Observable.error(new RuntimeException("timed out before executing run()"));
} else {
// not timed out so execute
threadExecutionStarted = true;
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
Expand Down Expand Up @@ -788,21 +820,38 @@ private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd)
// so we catch it here and turn it into Observable.error
userObservable = Observable.error(ex);
}
return userObservable.lift(new ExecutionHookApplication(_cmd))

final AtomicBoolean threadStateCleanedUp = new AtomicBoolean(false);

return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd))
.doOnTerminate(new Action0() {
@Override
public void call() {
//If the command timed out, then the calling thread has already walked away so we need
//to handle these markers. Otherwise, the calling thread will perform these for us.
if (isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd();

if (threadExecutionStarted && isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
if (threadStateCleanedUp.compareAndSet(false, true)) {
handleThreadEnd(_cmd);
}
}
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadExecutionStarted && isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
if (threadStateCleanedUp.compareAndSet(false, true)) {
handleThreadEnd(_cmd);
}
}
}
});
}

private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache) {
private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache, final AbstractCommand<R> _cmd) {
try {
executionHook.onCacheHit(this);
} catch (Throwable hookEx) {
Expand All @@ -815,7 +864,7 @@ private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandRes
@Override
public void call() {
if (!cleanupCompleted.get()) {
cleanUpAfterResponseFromCache();
cleanUpAfterResponseFromCache(_cmd);
isExecutionComplete = true;
cleanupCompleted.set(true);
}
Expand All @@ -824,14 +873,14 @@ public void call() {
@Override
public void call() {
if (!cleanupCompleted.get()) {
cleanUpAfterResponseFromCache();
cleanUpAfterResponseFromCache(_cmd);
cleanupCompleted.set(true);
}
}
});
}

private void cleanUpAfterResponseFromCache() {
private void cleanUpAfterResponseFromCache(AbstractCommand<R> _cmd) {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
Expand All @@ -846,21 +895,34 @@ private void cleanUpAfterResponseFromCache() {
.markUserThreadCompletion(latency);
metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, executionStarted);
eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey);

//in case of timeout, the work chained onto the Hystrix thread has the responsibility of this cleanup
if (threadExecutionStarted && !isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd(_cmd);
}
}

private void handleCommandEnd() {
private void handleCommandEnd(AbstractCommand<R> _cmd) {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
}

long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
ExecutionResult cancelled = executionResultAtTimeOfCancellation;
if (cancelled == null) {
if (executionResultAtTimeOfCancellation == null) {
metrics.markCommandDone(executionResult, commandKey, threadPoolKey, executionStarted);
} else {
metrics.markCommandDone(cancelled, commandKey, threadPoolKey, executionStarted);
metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, executionStarted);
}

if (endCurrentThreadExecutingCommand != null) {
endCurrentThreadExecutingCommand.call();
}

//in case of timeout, the work chained onto the Hystrix thread has the responsibility of this cleanup
if (threadExecutionStarted && !isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd(_cmd);
}
}

Expand All @@ -871,8 +933,7 @@ private Observable<R> handleSemaphoreRejectionViaFallback() {
logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
// retrieve a fallback or throw an exception if no fallback available
return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
"could not acquire a semaphore for execution", semaphoreRejectionException)
.lift(new DeprecatedOnCompleteWithValueHookApplication(this));
"could not acquire a semaphore for execution", semaphoreRejectionException);
}

private Observable<R> handleShortCircuitViaFallback() {
Expand All @@ -883,8 +944,7 @@ private Observable<R> handleShortCircuitViaFallback() {
executionResult = executionResult.setExecutionException(shortCircuitException);
try {
return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
"short-circuited", shortCircuitException)
.lift(new DeprecatedOnCompleteWithValueHookApplication(this));
"short-circuited", shortCircuitException);
} catch (Exception e) {
return Observable.error(e);
}
Expand Down Expand Up @@ -998,18 +1058,13 @@ private boolean isRecoverableError(Throwable t) {
return false;
}

protected void handleThreadEnd() {
if (endCurrentThreadExecutingCommand != null) {
endCurrentThreadExecutingCommand.call();
}
if (executionResult.isExecutedInThread()) {
HystrixCounters.decrementGlobalConcurrentThreads();
threadPool.markThreadCompletion();
try {
executionHook.onThreadComplete(this);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onThreadComplete", hookEx);
}
protected void handleThreadEnd(AbstractCommand<R> _cmd) {
HystrixCounters.decrementGlobalConcurrentThreads();
threadPool.markThreadCompletion();
try {
executionHook.onThreadComplete(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onThreadComplete", hookEx);
}
}

Expand Down Expand Up @@ -1260,41 +1315,6 @@ public HystrixCommandProperties getProperties() {
/* ******************************************************************************** */
/* ******************************************************************************** */

private class CommandHookApplication implements Operator<R, R> {
private final HystrixInvokable<R> cmd;

CommandHookApplication(HystrixInvokable<R> cmd) {
this.cmd = cmd;
}

@Override
public Subscriber<? super R> call(final Subscriber<? super R> subscriber) {
return new Subscriber<R>(subscriber) {
@Override
public void onCompleted() {
try {
executionHook.onSuccess(cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
}
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
//can't add the calls to executionHook.onError here, since this requires a FailureType param as well
subscriber.onError(e);
}

@Override
public void onNext(R r) {
R wrappedValue = wrapWithOnEmitHook(r);
subscriber.onNext(wrappedValue);
}
};
}
}

private class ExecutionHookApplication implements Operator<R, R> {
private final HystrixInvokable<R> cmd;

Expand Down Expand Up @@ -1365,41 +1385,6 @@ public void onNext(R r) {
}
}

@Deprecated //separated out to make it cleanly removable
private class DeprecatedOnCompleteWithValueHookApplication implements Operator<R, R> {
private final HystrixInvokable<R> cmd;

DeprecatedOnCompleteWithValueHookApplication(HystrixInvokable<R> cmd) {
this.cmd = cmd;
}

@Override
public Subscriber<? super R> call(final Subscriber<? super R> subscriber) {
return new Subscriber<R>(subscriber) {
@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(R r) {
try {
R wrappedValue = executionHook.onComplete(cmd, r);
subscriber.onNext(wrappedValue);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
subscriber.onNext(r);
}
}
};
}
}

@Deprecated //separated out to make it cleanly removable
private class DeprecatedOnRunHookApplication implements Operator<R, R> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public <T> void onThreadStart(HystrixInvokable<T> commandInstance) {

/**
* Invoked at completion of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}.
* This will get invoked if the Hystrix thread successfully executes, regardless of whether the calling thread
* encountered a timeout.
* This will get invoked whenever the Hystrix thread is done executing, regardless of whether the thread finished
* naturally, or was unsubscribed externally
*
* @param commandInstance The executing HystrixCommand instance.
*
Expand Down
Loading

0 comments on commit 7d3afbf

Please sign in to comment.