Skip to content

Commit

Permalink
Prevent duplicate arguments from getting into a single collapser Requ…
Browse files Browse the repository at this point in the history
…estBatch.

If this is attempted, then 1 of 2 things can occur:
If request-caching is on: the response for the 2nd-nth instance of the argument is the same as the first
If off: the response for the 2nd-nth instance of the argument is an error
  • Loading branch information
Matt Jacobs committed Jul 13, 2016
1 parent 8ad9b88 commit 657f698
Show file tree
Hide file tree
Showing 7 changed files with 442 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ public Observable<ResponseType> toObservable() {
* {@link #mapResponseToRequests} to transform the {@code <BatchReturnType>} into {@code <ResponseType>}
*/
public Observable<ResponseType> toObservable(Scheduler observeOn) {

return Observable.defer(new Func0<Observable<ResponseType>>() {
@Override
public Observable<ResponseType> call() {
Expand All @@ -399,20 +398,12 @@ public Observable<ResponseType> call() {
Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());

if (isRequestCacheEnabled && cacheKey != null) {
/*
* A race can occur here with multiple threads queuing but only one will be cached.
* This means we can have some duplication of requests in a thread-race but we're okay
* with having some inefficiency in duplicate requests in the same batch
* and then subsequent requests will retrieve a previously cached Observable.
*
* If this is an issue we can make a lazy-future that gets set in the cache
* then only the winning 'put' will be invoked to actually call 'submitRequest'
*/
HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response);
HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache == null) {
return toCache.toObservable();
} else {
toCache.unsubscribe();
return fromCache.toObservable();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@
private final ReplaySubject<T> subject = ReplaySubject.create();
private final Observable<T> subjectWithAccounting;

private volatile boolean subscribedTo = false;
private volatile int outstandingSubscriptions = 0;

public CollapsedRequestSubject(final R arg, final RequestBatch<?, T, R> containingBatch) {
if (arg == RequestCollapser.NULL_SENTINEL) {
this.argument = null;
} else {
this.argument = arg;
}
this.subjectWithAccounting = subject
.doOnSubscribe(new Action0() {
@Override
public void call() {
outstandingSubscriptions++;
if (!subscribedTo) {
subscribedTo = true;
//containingBatch.add(arg, this);
}
}
})
.doOnUnsubscribe(new Action0() {
Expand All @@ -73,7 +73,6 @@ public void call() {
}
}
});
this.argument = arg;
}

public CollapsedRequestSubject(final R arg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package com.netflix.hystrix.collapser;

import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.slf4j.Logger;
Expand Down Expand Up @@ -46,13 +46,14 @@ public class RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> {
private final int maxBatchSize;
private final AtomicBoolean batchStarted = new AtomicBoolean();

private final ConcurrentLinkedQueue<CollapsedRequest<ResponseType, RequestArgumentType>> batchArgumentQueue =
new ConcurrentLinkedQueue<CollapsedRequest<ResponseType, RequestArgumentType>>();
private final AtomicInteger count = new AtomicInteger(0);
private final ConcurrentMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>> argumentMap =
new ConcurrentHashMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>>();
private final HystrixCollapserProperties properties;

private ReentrantReadWriteLock batchLock = new ReentrantReadWriteLock();

public RequestBatch(HystrixCollapserProperties properties, HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, int maxBatchSize) {
this.properties = properties;
this.commandCollapser = commandCollapser;
this.maxBatchSize = maxBatchSize;
}
Expand All @@ -76,14 +77,35 @@ public Observable<ResponseType> offer(RequestArgumentType arg) {
return null;
}

if (count.get() >= maxBatchSize) {
if (argumentMap.size() >= maxBatchSize) {
return null;
} else {
CollapsedRequestSubject<ResponseType, RequestArgumentType> collapsedRequest =
new CollapsedRequestSubject<ResponseType, RequestArgumentType>(arg, this);
batchArgumentQueue.add(collapsedRequest);
count.incrementAndGet();
return collapsedRequest.toObservable();
final CollapsedRequestSubject<ResponseType, RequestArgumentType> existing = (CollapsedRequestSubject<ResponseType, RequestArgumentType>) argumentMap.putIfAbsent(arg, collapsedRequest);
/**
* If the argument already exists in the batch, then there are 2 options:
* A) If request caching is ON (the default): only keep 1 argument in the batch and let all responses
* be hooked up to that argument
* B) If request caching is OFF: return an error to all duplicate argument requests
*
* This maintains the invariant that each batch has no duplicate arguments. This prevents the impossible
* logic (in a user-provided mapResponseToRequests for HystrixCollapser and the internals of HystrixObservableCollapser)
* of trying to figure out which argument of a set of duplicates should get attached to a response.
*
* See https://github.com/Netflix/Hystrix/pull/1176 for further discussion.
*/
if (existing != null) {
boolean requestCachingEnabled = properties.requestCacheEnabled().get();
if (requestCachingEnabled) {
return existing.toObservable();
} else {
return Observable.error(new IllegalArgumentException("Duplicate argument in collapser batch : [" + arg + "] This is not supported. Please turn request-caching on for HystrixCollapser:" + commandCollapser.getCollapserKey().name() + " or prevent duplicates from making it into the batch!"));
}
} else {
return collapsedRequest.toObservable();
}

}
} finally {
batchLock.readLock().unlock();
Expand All @@ -95,10 +117,8 @@ public Observable<ResponseType> offer(RequestArgumentType arg) {

/**
* Best-effort attempt to remove an argument from a batch. This may get invoked when a cancellation occurs somewhere downstream.
* This method finds the first occurrence of an argument in the batch, and removes that occurrence.
* This method finds the argument in the batch, and removes it.
*
* This is currently O(n). If an O(1) approach is needed, then we need to refactor internals to use a Map instead of Queue.
* My first pass at this is fairly naive, on the suspicion that unsubscription will be rare enough to not cause a perf problem.
* @param arg argument to remove from batch
*/
/* package-private */ void remove(RequestArgumentType arg) {
Expand All @@ -114,13 +134,7 @@ public Observable<ResponseType> offer(RequestArgumentType arg) {
return;
}

for (CollapsedRequest<ResponseType, RequestArgumentType> collapsedRequest: batchArgumentQueue) {
if (arg.equals(collapsedRequest.getArgument())) {
batchArgumentQueue.remove(collapsedRequest);
count.decrementAndGet();
return; //just remove a single instance
}
}
argumentMap.remove(arg);
} finally {
batchLock.readLock().unlock();
}
Expand Down Expand Up @@ -150,7 +164,7 @@ public void executeBatchIfNotAlreadyStarted() {

try {
// shard batches
Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(batchArgumentQueue);
Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
// for each shard execute its requests
for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
try {
Expand All @@ -173,7 +187,7 @@ public void call(Throwable e) {
}
logger.debug("Exception mapping responses to requests.", e);
// if a failure occurs we want to pass that exception to all of the Futures that we've returned
for (CollapsedRequest<ResponseType, RequestArgumentType> request : batchArgumentQueue) {
for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {
try {
((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(ee);
} catch (IllegalStateException e2) {
Expand Down Expand Up @@ -221,7 +235,7 @@ public void call() {
} catch (Exception e) {
logger.error("Exception while sharding requests.", e);
// same error handling as we do around the shards, but this is a wider net in case the shardRequest method fails
for (CollapsedRequest<ResponseType, RequestArgumentType> request : batchArgumentQueue) {
for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {
try {
request.setException(e);
} catch (IllegalStateException e2) {
Expand All @@ -241,16 +255,16 @@ public void shutdown() {
batchLock.writeLock().lock();
try {
// if we win the 'start' and once we have the lock we can now shut it down otherwise another thread will finish executing this batch
if (count.get() > 0) {
logger.warn("Requests still exist in queue but will not be executed due to RequestCollapser shutdown: " + count.get(), new IllegalStateException());
if (argumentMap.size() > 0) {
logger.warn("Requests still exist in queue but will not be executed due to RequestCollapser shutdown: " + argumentMap.size(), new IllegalStateException());
/*
* In the event that there is a concurrency bug or thread scheduling prevents the timer from ticking we need to handle this so the Future.get() calls do not block.
*
* I haven't been able to reproduce this use case on-demand but when stressing a machine saw this occur briefly right after the JVM paused (logs stopped scrolling).
*
* This safety-net just prevents the CollapsedRequestFutureImpl.get() from waiting on the CountDownLatch until its max timeout.
*/
for (CollapsedRequest<ResponseType, RequestArgumentType> request : batchArgumentQueue) {
for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {
try {
((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(new IllegalStateException("Requests not executed before shutdown."));
} catch (Exception e) {
Expand All @@ -270,6 +284,6 @@ public void shutdown() {
}

public int getSize() {
return count.get();
return argumentMap.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
*/
public class RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> {
static final Logger logger = LoggerFactory.getLogger(RequestCollapser.class);
static final Object NULL_SENTINEL = new Object();

private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser;
// batch can be null once shutdown
Expand Down Expand Up @@ -89,10 +90,15 @@ public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown"));
}

Observable<ResponseType> f = b.offer(arg);
final Observable<ResponseType> response;
if (arg != null) {
response = b.offer(arg);
} else {
response = b.offer( (RequestArgumentType) NULL_SENTINEL);
}
// it will always get an Observable unless we hit the max batch size
if (f != null) {
return f;
if (response != null) {
return response;
} else {
// this batch can't accept requests so create a new one and set it if another thread doesn't beat us
createNewBatchAndExecutePreviousIfNeeded(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class HystrixPropertiesFactory {
public static void reset() {
commandProperties.clear();
threadPoolProperties.clear();
collapserProperties.clear();
}

// String is CommandKey.name() (we can't use CommandKey directly as we can't guarantee it implements hashcode/equals correctly)
Expand Down
Loading

0 comments on commit 657f698

Please sign in to comment.