Skip to content

Commit

Permalink
Merge pull request Netflix#261 from benjchristensen/observable-collapser
Browse files Browse the repository at this point in the history
New signature for HystrixObservableCollapser
  • Loading branch information
benjchristensen committed May 6, 2014
2 parents 7cf36e6 + 6bdb273 commit 5212caa
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;
import rx.Scheduler;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;

Expand Down Expand Up @@ -142,8 +143,17 @@ public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedR
}

@Override
public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
self.mapResponseToRequests(batchResponse, requests);
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
return batchResponse.single().flatMap(new Func1<BatchReturnType, Observable<Void>>() {

@Override
public Observable<Void> call(BatchReturnType response) {
// this is a blocking call in HystrixCollapser
self.mapResponseToRequests(response, requests);
return Observable.empty();
}

});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,20 @@
*/
package com.netflix.hystrix;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.*;
import java.util.concurrent.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;

import rx.*;
import rx.Observable;
import rx.Scheduler;
import rx.functions.*;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;

import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.collapser.CollapserTimer;
import com.netflix.hystrix.collapser.HystrixCollapserBridge;
import com.netflix.hystrix.collapser.RealCollapserTimer;
import com.netflix.hystrix.collapser.RequestCollapser;
import com.netflix.hystrix.collapser.RequestCollapserFactory;
import com.netflix.hystrix.collapser.*;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
Expand All @@ -52,14 +46,16 @@
* It must be stateless or else it will be non-deterministic because most instances are discarded while some are retained and become the
* "collapsers" for all the ones that are discarded.
*
* @param <K>
* The key used to match BatchReturnType and RequestArgumentType
* @param <BatchReturnType>
* The type returned from the {@link HystrixCommand} that will be invoked on batch executions.
* @param <ResponseType>
* The type returned from this command.
* @param <RequestArgumentType>
* The type of the request argument. If multiple arguments are needed, wrap them in another object or a Tuple.
*/
public abstract class HystrixObservableCollapser<BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType> {
public abstract class HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType> {

static final Logger logger = LoggerFactory.getLogger(HystrixObservableCollapser.class);

Expand Down Expand Up @@ -120,7 +116,7 @@ protected HystrixObservableCollapser(Setter setter) {
this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, propertiesBuilder);
this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());

final HystrixObservableCollapser<BatchReturnType, ResponseType, RequestArgumentType> self = this;
final HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> self = this;

/**
* Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.
Expand All @@ -143,8 +139,40 @@ public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedR
}

@Override
public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
self.mapResponseToRequests(batchResponse, requests);
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
Func1<RequestArgumentType, K> requestKeySelector = self.getRequestArgumentKeySelector();
final Func1<BatchReturnType, K> batchResponseKeySelector = self.getBatchReturnTypeKeySelector();
final Func1<BatchReturnType, ResponseType> mapBatchTypeToResponseType = self.getBatchReturnTypeToResponseTypeMapper();

// index the requests by key
final Map<K, CollapsedRequest<ResponseType, RequestArgumentType>> requestsByKey = new HashMap<K, CollapsedRequest<ResponseType, RequestArgumentType>>(requests.size());
for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requests) {
requestsByKey.put(requestKeySelector.call(cr.getArgument()), cr);
}

// observe the responses and join with the requests by key
return batchResponse.flatMap(new Func1<BatchReturnType, Observable<Void>>() {

@Override
public Observable<Void> call(BatchReturnType r) {
K responseKey = batchResponseKeySelector.call(r);
CollapsedRequest<ResponseType, RequestArgumentType> requestForResponse = requestsByKey.get(responseKey);
requestForResponse.setResponse(mapBatchTypeToResponseType.call(r));
// now remove from map so we know what wasn't set at end
requestsByKey.remove(responseKey);
return Observable.empty();
}

}).doOnTerminate(new Action0() {

@Override
public void call() {
for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requestsByKey.values()) {
onMissingResponse(cr);
}
}

});
}

@Override
Expand Down Expand Up @@ -211,7 +239,8 @@ public Scope getScope() {
*
* @param requests
* {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch.
* @return {@link HystrixObservableCommand}{@code <BatchReturnType>} which when executed will retrieve results for the batch of arguments as found in the Collection of {@link CollapsedRequest} objects
* @return {@link HystrixObservableCommand}{@code <BatchReturnType>} which when executed will retrieve results for the batch of arguments as found in the Collection of {@link CollapsedRequest}
* objects
*/
protected abstract HystrixObservableCommand<BatchReturnType> createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);

Expand All @@ -235,50 +264,41 @@ protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentTy
}

/**
* Executed after the {@link HystrixCommand}{@code <BatchReturnType>} command created by {@link #createCommand} finishes processing (unless it fails) for mapping the {@code <BatchReturnType>} to
* the list of {@code CollapsedRequest<ResponseType, RequestArgumentType>} objects.
* Function that returns the key used for matching returned objects against request argument types.
* <p>
* IMPORTANT IMPLEMENTATION DETAIL => The expected contract (responsibilities) of this method implementation is:
* <p>
* <ul>
* <li>ALL {@link CollapsedRequest} objects must have either a response or exception set on them even if the response is NULL
* otherwise the user thread waiting on the response will think a response was never received and will either block indefinitely or timeout while waiting.</li>
* <ul>
* <li>Setting a response is done via {@link CollapsedRequest#setResponse(Object)}</li>
* <li>Setting an exception is done via {@link CollapsedRequest#setException(Exception)}</li>
* </ul>
* </ul>
* <p>
* Common code when {@code <BatchReturnType>} is {@code List<ResponseType>} is:
* <p>
*
* <pre>
* int count = 0;
* for ({@code CollapsedRequest<ResponseType, RequestArgumentType>} request : requests) {
* &nbsp;&nbsp;&nbsp;&nbsp; request.setResponse(batchResponse.get(count++));
* }
* </pre>
* The key returned from this function should match up with the key returned from {@link #getRequestArgumentKeySelector()};
*
* For example if the types were {@code <List<String>, String, String>}:
* @return key selector function
*/
protected abstract Func1<BatchReturnType, K> getBatchReturnTypeKeySelector();

/**
* Function that returns the key used for matching request arguments against returned objects.
* <p>
* The key returned from this function should match up with the key returned from {@link #getBatchReturnTypeKeySelector()};
*
* <pre>
* int count = 0;
* for ({@code CollapsedRequest<String, String>} request : requests) {
* &nbsp;&nbsp;&nbsp;&nbsp; request.setResponse(batchResponse.get(count++));
* }
* </pre>
* @return key selector function
*/
protected abstract Func1<RequestArgumentType, K> getRequestArgumentKeySelector();

/**
* Invoked if a {@link CollapsedRequest} in the batch does not have a response set on it.
* <p>
* This allows setting an exception (via {@link CollapsedRequest#setException(Exception)}) or a fallback response (via {@link CollapsedRequest#setResponse(Object)}).
*
* @param batchResponse
* The {@code <BatchReturnType>} returned from the {@link HystrixCommand}{@code <BatchReturnType>} command created by {@link #createCommand}.
* <p>
* @param CollapsedRequest
* that needs a response or exception set on it.
*/
protected abstract void onMissingResponse(CollapsedRequest<ResponseType, RequestArgumentType> r);

/**
* Function for mapping from BatchReturnType to ResponseType.
* <p>
* Often these two types are exactly the same so it's just a pass-thru.
*
* @param requests
* {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch.
* <p>
* The {@link CollapsedRequest#setResponse(Object)} or {@link CollapsedRequest#setException(Exception)} must be called on each {@link CollapsedRequest} in the Collection.
* @return function for mapping from BatchReturnType to ResponseType
*/
protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
protected abstract Func1<BatchReturnType, ResponseType> getBatchReturnTypeToResponseTypeMapper();

/**
* Used for asynchronous execution with a callback by subscribing to the {@link Observable}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface HystrixCollapserBridge<BatchReturnType, ResponseType, RequestAr

public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);

public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);

public HystrixCollapserKey getCollapserKey();

Expand Down
Loading

0 comments on commit 5212caa

Please sign in to comment.