Skip to content

Commit

Permalink
New signature for HystrixObservableCollapser
Browse files Browse the repository at this point in the history
Replacing mapResponseToRequests with functional style selectors and allowing streaming response from Observable.
  • Loading branch information
benjchristensen committed May 6, 2014
1 parent 7cf36e6 commit d1204e1
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;
import rx.Scheduler;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;

Expand Down Expand Up @@ -142,8 +143,17 @@ public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedR
}

@Override
public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
self.mapResponseToRequests(batchResponse, requests);
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
return batchResponse.single().flatMap(new Func1<BatchReturnType, Observable<Void>>() {

@Override
public Observable<Void> call(BatchReturnType response) {
// this is a blocking call in HystrixCollapser
self.mapResponseToRequests(response, requests);
return Observable.empty();
}

});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,20 @@
*/
package com.netflix.hystrix;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.*;
import java.util.concurrent.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;

import rx.*;
import rx.Observable;
import rx.Scheduler;
import rx.functions.*;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;

import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.collapser.CollapserTimer;
import com.netflix.hystrix.collapser.HystrixCollapserBridge;
import com.netflix.hystrix.collapser.RealCollapserTimer;
import com.netflix.hystrix.collapser.RequestCollapser;
import com.netflix.hystrix.collapser.RequestCollapserFactory;
import com.netflix.hystrix.collapser.*;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
Expand All @@ -59,7 +53,7 @@
* @param <RequestArgumentType>
* The type of the request argument. If multiple arguments are needed, wrap them in another object or a Tuple.
*/
public abstract class HystrixObservableCollapser<BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType> {
public abstract class HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType> {

static final Logger logger = LoggerFactory.getLogger(HystrixObservableCollapser.class);

Expand Down Expand Up @@ -120,7 +114,7 @@ protected HystrixObservableCollapser(Setter setter) {
this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, propertiesBuilder);
this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());

final HystrixObservableCollapser<BatchReturnType, ResponseType, RequestArgumentType> self = this;
final HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> self = this;

/**
* Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.
Expand All @@ -143,8 +137,40 @@ public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedR
}

@Override
public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
self.mapResponseToRequests(batchResponse, requests);
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
Func1<RequestArgumentType, K> requestKeySelector = self.getRequestArgumentKeySelector();
final Func1<BatchReturnType, K> batchResponseKeySelector = self.getBatchReturnTypeKeySelector();
final Func1<BatchReturnType, ResponseType> mapBatchTypeToResponseType = self.getBatchReturnTypeToResponseTypeMapper();

// index the requests by key
final Map<K, CollapsedRequest<ResponseType, RequestArgumentType>> requestsByKey = new HashMap<K, CollapsedRequest<ResponseType, RequestArgumentType>>(requests.size());
for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requests) {
requestsByKey.put(requestKeySelector.call(cr.getArgument()), cr);
}

// observe the responses and join with the requests by key
return batchResponse.flatMap(new Func1<BatchReturnType, Observable<Void>>() {

@Override
public Observable<Void> call(BatchReturnType r) {
K responseKey = batchResponseKeySelector.call(r);
CollapsedRequest<ResponseType, RequestArgumentType> requestForResponse = requestsByKey.get(responseKey);
requestForResponse.setResponse(mapBatchTypeToResponseType.call(r));
// now remove from map so we know what wasn't set at end
requestsByKey.remove(responseKey);
return Observable.empty();
}

}).doOnTerminate(new Action0() {

@Override
public void call() {
for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requestsByKey.values()) {
onMissingResponse(cr);
}
}

});
}

@Override
Expand All @@ -155,6 +181,8 @@ public HystrixCollapserKey getCollapserKey() {
};
}

protected abstract Func1<BatchReturnType, ResponseType> getBatchReturnTypeToResponseTypeMapper();

private HystrixCollapserProperties getProperties() {
return collapserFactory.getProperties();
}
Expand Down Expand Up @@ -211,7 +239,8 @@ public Scope getScope() {
*
* @param requests
* {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch.
* @return {@link HystrixObservableCommand}{@code <BatchReturnType>} which when executed will retrieve results for the batch of arguments as found in the Collection of {@link CollapsedRequest} objects
* @return {@link HystrixObservableCommand}{@code <BatchReturnType>} which when executed will retrieve results for the batch of arguments as found in the Collection of {@link CollapsedRequest}
* objects
*/
protected abstract HystrixObservableCommand<BatchReturnType> createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);

Expand All @@ -234,51 +263,11 @@ protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentTy
return Collections.singletonList(requests);
}

/**
* Executed after the {@link HystrixCommand}{@code <BatchReturnType>} command created by {@link #createCommand} finishes processing (unless it fails) for mapping the {@code <BatchReturnType>} to
* the list of {@code CollapsedRequest<ResponseType, RequestArgumentType>} objects.
* <p>
* IMPORTANT IMPLEMENTATION DETAIL => The expected contract (responsibilities) of this method implementation is:
* <p>
* <ul>
* <li>ALL {@link CollapsedRequest} objects must have either a response or exception set on them even if the response is NULL
* otherwise the user thread waiting on the response will think a response was never received and will either block indefinitely or timeout while waiting.</li>
* <ul>
* <li>Setting a response is done via {@link CollapsedRequest#setResponse(Object)}</li>
* <li>Setting an exception is done via {@link CollapsedRequest#setException(Exception)}</li>
* </ul>
* </ul>
* <p>
* Common code when {@code <BatchReturnType>} is {@code List<ResponseType>} is:
* <p>
*
* <pre>
* int count = 0;
* for ({@code CollapsedRequest<ResponseType, RequestArgumentType>} request : requests) {
* &nbsp;&nbsp;&nbsp;&nbsp; request.setResponse(batchResponse.get(count++));
* }
* </pre>
*
* For example if the types were {@code <List<String>, String, String>}:
* <p>
*
* <pre>
* int count = 0;
* for ({@code CollapsedRequest<String, String>} request : requests) {
* &nbsp;&nbsp;&nbsp;&nbsp; request.setResponse(batchResponse.get(count++));
* }
* </pre>
*
* @param batchResponse
* The {@code <BatchReturnType>} returned from the {@link HystrixCommand}{@code <BatchReturnType>} command created by {@link #createCommand}.
* <p>
*
* @param requests
* {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch.
* <p>
* The {@link CollapsedRequest#setResponse(Object)} or {@link CollapsedRequest#setException(Exception)} must be called on each {@link CollapsedRequest} in the Collection.
*/
protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
protected abstract Func1<BatchReturnType, K> getBatchReturnTypeKeySelector();

protected abstract Func1<RequestArgumentType, K> getRequestArgumentKeySelector();

protected abstract void onMissingResponse(CollapsedRequest<ResponseType, RequestArgumentType> r);

/**
* Used for asynchronous execution with a callback by subscribing to the {@link Observable}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface HystrixCollapserBridge<BatchReturnType, ResponseType, RequestAr

public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);

public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);

public HystrixCollapserKey getCollapserKey();

Expand Down
Loading

0 comments on commit d1204e1

Please sign in to comment.