forked from Netflix/Hystrix
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added example demonstrating ObservableCollapser
This was based on the discussion in Netflix#895. I've modified the original example to be compatible with JDK 6, removed the dependency to ICU and refactored the test class to be part of the command class to match the pattern used for other examples.
- Loading branch information
1 parent
3c839fa
commit 76562ee
Showing
2 changed files
with
343 additions
and
0 deletions.
There are no files selected for viewing
262 changes: 262 additions & 0 deletions
262
...src/main/java/com/netflix/hystrix/examples/basic/ObservableCollapserGetWordForNumber.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,262 @@ | ||
package com.netflix.hystrix.examples.basic; | ||
|
||
import static org.hamcrest.CoreMatchers.equalTo; | ||
import static org.hamcrest.CoreMatchers.is; | ||
import static org.junit.Assert.assertThat; | ||
import static org.junit.Assert.assertTrue; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Map.Entry; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
import org.junit.After; | ||
import org.junit.Before; | ||
import org.junit.Test; | ||
|
||
import rx.Observable; | ||
import rx.functions.Func0; | ||
import rx.functions.Func1; | ||
import rx.observers.TestSubscriber; | ||
import rx.schedulers.Schedulers; | ||
|
||
import com.netflix.hystrix.HystrixCollapser.CollapsedRequest; | ||
import com.netflix.hystrix.HystrixObservableCollapser; | ||
import com.netflix.hystrix.HystrixObservableCommand; | ||
import com.netflix.hystrix.HystrixRequestLog; | ||
import com.netflix.hystrix.examples.basic.ObservableCommandNumbersToWords.NumberWord; | ||
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler; | ||
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; | ||
|
||
/** | ||
* Example that uses {@link HystrixObservableCollapser} to batch multiple {@link ObservableCommandNumbersToWords} requests. | ||
* | ||
* @author Patrick Ruhkopf | ||
*/ | ||
public class ObservableCollapserGetWordForNumber extends HystrixObservableCollapser<Integer, NumberWord, String, Integer> | ||
{ | ||
private final Integer number; | ||
|
||
private final static AtomicInteger counter = new AtomicInteger(); | ||
|
||
public static void resetCmdCounter() | ||
{ | ||
counter.set(0); | ||
} | ||
|
||
public static int getCmdCount() | ||
{ | ||
return counter.get(); | ||
} | ||
|
||
public ObservableCollapserGetWordForNumber(final Integer number) | ||
{ | ||
this.number = number; | ||
} | ||
|
||
@Override | ||
public Integer getRequestArgument() | ||
{ | ||
return number; | ||
} | ||
|
||
@SuppressWarnings("boxing") | ||
@Override | ||
protected HystrixObservableCommand<NumberWord> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) | ||
{ | ||
final int count = counter.incrementAndGet(); | ||
System.out.println("Creating batch for " + requests.size() + " requests. Total invocations so far: " + count); | ||
|
||
final List<Integer> numbers = new ArrayList<Integer>(); | ||
for (final CollapsedRequest<String, Integer> request : requests) | ||
{ | ||
numbers.add(request.getArgument()); | ||
} | ||
|
||
return new ObservableCommandNumbersToWords(numbers); | ||
} | ||
|
||
@Override | ||
protected Func1<NumberWord, Integer> getBatchReturnTypeKeySelector() | ||
{ | ||
return new Func1<NumberWord, Integer>() | ||
{ | ||
@Override | ||
public Integer call(final NumberWord nw) | ||
{ | ||
return nw.getNumber(); | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
protected Func1<Integer, Integer> getRequestArgumentKeySelector() | ||
{ | ||
return new Func1<Integer, Integer>() | ||
{ | ||
@Override | ||
public Integer call(final Integer no) | ||
{ | ||
return no; | ||
} | ||
|
||
}; | ||
} | ||
|
||
@Override | ||
protected Func1<NumberWord, String> getBatchReturnTypeToResponseTypeMapper() | ||
{ | ||
return new Func1<NumberWord, String>() | ||
{ | ||
@Override | ||
public String call(final NumberWord nw) | ||
{ | ||
return nw.getWord(); | ||
} | ||
}; | ||
} | ||
|
||
@Override | ||
protected void onMissingResponse(final CollapsedRequest<String, Integer> request) | ||
{ | ||
request.setException(new Exception("No word")); | ||
} | ||
|
||
public static class ObservableCollapserGetWordForNumberTest | ||
{ | ||
private HystrixRequestContext ctx; | ||
|
||
@Before | ||
public void before() | ||
{ | ||
ctx = HystrixRequestContext.initializeContext(); | ||
ObservableCollapserGetWordForNumber.resetCmdCounter(); | ||
} | ||
|
||
@After | ||
public void after() | ||
{ | ||
System.out.println(HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString()); | ||
ctx.shutdown(); | ||
} | ||
|
||
/** | ||
* Example where we subscribe without using a specific scheduler. That means we run the actions on the same thread. | ||
*/ | ||
@Test | ||
public void shouldCollapseRequestsSync() | ||
{ | ||
final int noOfRequests = 10; | ||
final Map<Integer, TestSubscriber<String>> subscribersByNumber = new HashMap<Integer, TestSubscriber<String>>( | ||
noOfRequests); | ||
|
||
TestSubscriber<String> subscriber; | ||
for (int number = 0; number < noOfRequests; number++) | ||
{ | ||
subscriber = new TestSubscriber<String>(); | ||
new ObservableCollapserGetWordForNumber(number).toObservable().subscribe(subscriber); | ||
subscribersByNumber.put(number, subscriber); | ||
|
||
// wait a little bit after running half of the requests so that we don't collapse all of them into one batch | ||
// TODO this can probably be improved by using a test scheduler | ||
if (number == noOfRequests / 2) | ||
sleep(1000); | ||
|
||
} | ||
|
||
assertThat(subscribersByNumber.size(), is(noOfRequests)); | ||
for (final Entry<Integer, TestSubscriber<String>> subscriberByNumber : subscribersByNumber.entrySet()) | ||
{ | ||
subscriber = subscriberByNumber.getValue(); | ||
subscriber.awaitTerminalEvent(10, TimeUnit.SECONDS); | ||
|
||
assertThat(subscriber.getOnErrorEvents().toString(), subscriber.getOnErrorEvents().size(), is(0)); | ||
assertThat(subscriber.getOnNextEvents().size(), is(1)); | ||
|
||
final String word = subscriber.getOnNextEvents().get(0); | ||
System.out.println("Translated " + subscriberByNumber.getKey() + " to " + word); | ||
assertThat(word, equalTo(numberToWord(subscriberByNumber.getKey()))); | ||
} | ||
|
||
assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() > 1); | ||
assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() < noOfRequests); | ||
} | ||
|
||
/** | ||
* Example where we subscribe on the computation scheduler. For this we need the {@link HystrixContextScheduler}, that | ||
* passes the {@link HystrixRequestContext} to the thread that runs the action. | ||
*/ | ||
@Test | ||
public void shouldCollapseRequestsAsync() | ||
{ | ||
final HystrixContextScheduler contextAwareScheduler = new HystrixContextScheduler(Schedulers.computation()); | ||
|
||
final int noOfRequests = 10; | ||
final Map<Integer, TestSubscriber<String>> subscribersByNumber = new HashMap<Integer, TestSubscriber<String>>( | ||
noOfRequests); | ||
|
||
TestSubscriber<String> subscriber; | ||
for (int number = 0; number < noOfRequests; number++) | ||
{ | ||
subscriber = new TestSubscriber<String>(); | ||
final int finalNumber = number; | ||
|
||
// defer and subscribe on specific scheduler | ||
Observable.defer(new Func0<Observable<String>>() | ||
{ | ||
@Override | ||
public Observable<String> call() | ||
{ | ||
return new ObservableCollapserGetWordForNumber(finalNumber).toObservable(); | ||
} | ||
}).subscribeOn(contextAwareScheduler).subscribe(subscriber); | ||
|
||
subscribersByNumber.put(number, subscriber); | ||
|
||
// wait a little bit after running half of the requests so that we don't collapse all of them into one batch | ||
// TODO this can probably be improved by using a test scheduler | ||
if (number == noOfRequests / 2) | ||
sleep(1000); | ||
} | ||
|
||
assertThat(subscribersByNumber.size(), is(noOfRequests)); | ||
for (final Entry<Integer, TestSubscriber<String>> subscriberByNumber : subscribersByNumber.entrySet()) | ||
{ | ||
subscriber = subscriberByNumber.getValue(); | ||
subscriber.awaitTerminalEvent(10, TimeUnit.SECONDS); | ||
|
||
assertThat(subscriber.getOnErrorEvents().toString(), subscriber.getOnErrorEvents().size(), is(0)); | ||
assertThat(subscriber.getOnNextEvents().size(), is(1)); | ||
|
||
final String word = subscriber.getOnNextEvents().get(0); | ||
System.out.println("Translated " + subscriberByNumber.getKey() + " to " + word); | ||
assertThat(word, equalTo(numberToWord(subscriberByNumber.getKey()))); | ||
} | ||
|
||
assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() > 1); | ||
assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() < noOfRequests); | ||
} | ||
|
||
private String numberToWord(final int number) | ||
{ | ||
return ObservableCommandNumbersToWords.dict.get(number); | ||
} | ||
|
||
private void sleep(final long ms) | ||
{ | ||
try | ||
{ | ||
Thread.sleep(1000); | ||
} | ||
catch (final InterruptedException e) | ||
{ | ||
throw new IllegalStateException(e); | ||
} | ||
} | ||
|
||
} | ||
} |
81 changes: 81 additions & 0 deletions
81
...les/src/main/java/com/netflix/hystrix/examples/basic/ObservableCommandNumbersToWords.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package com.netflix.hystrix.examples.basic; | ||
|
||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import rx.Observable; | ||
import rx.functions.Func1; | ||
|
||
import com.netflix.hystrix.HystrixCommandGroupKey; | ||
import com.netflix.hystrix.HystrixObservableCommand; | ||
import com.netflix.hystrix.examples.basic.ObservableCommandNumbersToWords.NumberWord; | ||
|
||
/** | ||
* A simple Hystrix Observable command that translates a number (<code>Integer</code>) into an English text. | ||
*/ | ||
class ObservableCommandNumbersToWords extends HystrixObservableCommand<NumberWord> | ||
{ | ||
private final List<Integer> numbers; | ||
|
||
// in the real world you'd probably want to replace this very simple code by using ICU or similar | ||
static Map<Integer, String> dict = new HashMap<Integer, String>(11); | ||
static | ||
{ | ||
dict.put(0, "zero"); | ||
dict.put(1, "one"); | ||
dict.put(2, "two"); | ||
dict.put(3, "three"); | ||
dict.put(4, "four"); | ||
dict.put(5, "five"); | ||
dict.put(6, "six"); | ||
dict.put(7, "seven"); | ||
dict.put(8, "eight"); | ||
dict.put(9, "nine"); | ||
dict.put(10, "ten"); | ||
} | ||
|
||
public ObservableCommandNumbersToWords(final List<Integer> numbers) | ||
{ | ||
super(HystrixCommandGroupKey.Factory.asKey(ObservableCommandNumbersToWords.class.getName())); | ||
this.numbers = numbers; | ||
} | ||
|
||
@Override | ||
protected Observable<NumberWord> construct() | ||
{ | ||
return Observable.from(numbers).map(new Func1<Integer, NumberWord>() | ||
{ | ||
@Override | ||
public NumberWord call(final Integer number) | ||
{ | ||
return new NumberWord(number, dict.get(number)); | ||
} | ||
|
||
}); | ||
} | ||
|
||
static class NumberWord | ||
{ | ||
private final Integer number; | ||
private final String word; | ||
|
||
public NumberWord(final Integer number, final String word) | ||
{ | ||
super(); | ||
this.number = number; | ||
this.word = word; | ||
} | ||
|
||
public Integer getNumber() | ||
{ | ||
return number; | ||
} | ||
|
||
public String getWord() | ||
{ | ||
return word; | ||
} | ||
} | ||
|
||
} |