From de706a229819d2a7088ac656f5c6c654af3dc304 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 17 Sep 2014 10:40:35 -0700 Subject: [PATCH] RxJava 0.20 and Remove Deprecated Usage --- hystrix-core/build.gradle | 2 +- .../com/netflix/hystrix/HystrixCollapser.java | 2 +- .../hystrix/HystrixExecutableBase.java | 2 +- .../hystrix/HystrixObservableCollapser.java | 2 +- .../hystrix/HystrixObservableCommand.java | 7 +- .../hystrix/HystrixCircuitBreakerTest.java | 15 +- .../netflix/hystrix/HystrixCollapserTest.java | 52 +++---- .../netflix/hystrix/HystrixCommandTest.java | 134 +++++++++--------- .../hystrix/HystrixObservableCommandTest.java | 62 ++++---- .../hystrix/HystrixRequestCacheTest.java | 8 +- .../hystrix/HystrixRequestLogTest.java | 2 +- ...ollapsedRequestObservableFunctionTest.java | 18 +-- .../hystrix/strategy/HystrixPluginsTest.java | 2 +- .../HystrixConcurrencyStrategyTest.java | 8 +- 14 files changed, 155 insertions(+), 161 deletions(-) diff --git a/hystrix-core/build.gradle b/hystrix-core/build.gradle index ee33a9b34..167150e91 100644 --- a/hystrix-core/build.gradle +++ b/hystrix-core/build.gradle @@ -4,7 +4,7 @@ apply plugin: 'idea' dependencies { compile 'com.netflix.archaius:archaius-core:0.4.1' - compile 'com.netflix.rxjava:rxjava-core:0.18.2' + compile 'com.netflix.rxjava:rxjava-core:0.20.4' compile 'org.slf4j:slf4j-api:1.7.0' testCompile 'junit:junit-dep:4.10' } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java index 249e833e9..9aa448947 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java @@ -429,7 +429,7 @@ public ResponseType execute() { */ public Future queue() { final Observable o = toObservable(); - return o.toBlockingObservable().toFuture(); + return o.toBlocking().toFuture(); } /** diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixExecutableBase.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixExecutableBase.java index e76b0730f..8a26a39e1 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixExecutableBase.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixExecutableBase.java @@ -339,7 +339,7 @@ public Future queue() { * is going to sit waiting on it. */ final ObservableCommand o = toObservable(Schedulers.immediate(), false); - final Future f = o.toBlockingObservable().toFuture(); + final Future f = o.toBlocking().toFuture(); /* special handling of error states that throw immediately */ if (f.isDone()) { diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java index 49da6a4c7..a060453ca 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java @@ -440,7 +440,7 @@ public ResponseType execute() { */ public Future queue() { final Observable o = toObservable(); - return o.toBlockingObservable().toFuture(); + return o.toBlocking().toFuture(); } /** diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java index d90159d11..9d3491bc9 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java @@ -839,7 +839,7 @@ public int getIntervalTimeInMilliseconds() { /** * If this subscriber receives values it means the parent succeeded/completed */ - return new Subscriber(s) { + Subscriber parent = new Subscriber() { @Override public void onCompleted() { @@ -873,6 +873,11 @@ private boolean isNotTimedOut() { } }; + + // if s is unsubscribed we want to unsubscribe the parent + s.add(parent); + + return parent; } } diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCircuitBreakerTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCircuitBreakerTest.java index b6a770b6e..6e83c42cc 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCircuitBreakerTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCircuitBreakerTest.java @@ -574,7 +574,7 @@ protected String getFallback() { public class MyHystrixCommandExecutionHook extends HystrixCommandExecutionHook { @Override - public T onComplete(final HystrixCommand command, final T response) { + public T onComplete(final HystrixExecutable command, final T response) { logHC(command, response); @@ -583,16 +583,17 @@ public T onComplete(final HystrixCommand command, final T response) { private int counter = 0; - private void logHC(HystrixCommand command, T response) { + private void logHC(HystrixExecutable command, T response) { - //if ((counter++ % 20) == 0) { - HystrixCommandMetrics metrics = command.getMetrics(); + if(command instanceof HystrixExecutableInfo) { + HystrixExecutableInfo commandInfo = (HystrixExecutableInfo)command; + HystrixCommandMetrics metrics = commandInfo.getMetrics(); System.out.println("cb/error-count/%/total: " - + command.isCircuitBreakerOpen() + " " + + commandInfo.isCircuitBreakerOpen() + " " + metrics.getHealthCounts().getErrorCount() + " " + metrics.getHealthCounts().getErrorPercentage() + " " - + metrics.getHealthCounts().getTotalRequests() + " => " + response + " " + command.getExecutionEvents()); - //} + + metrics.getHealthCounts().getTotalRequests() + " => " + response + " " + commandInfo.getExecutionEvents()); + } } } } diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java index ef7e780a1..dc04fb0d5 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java @@ -62,7 +62,7 @@ public void testTwoRequests() throws Exception { assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -85,7 +85,7 @@ public void testMultipleBatches() throws Exception { // we should have had it execute twice now assertEquals(2, counter.get()); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -102,7 +102,7 @@ public void testMaxRequestsInBatch() throws Exception { // we should have had it execute twice because the batch size was 2 assertEquals(2, counter.get()); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -131,7 +131,7 @@ public void testRequestsOverTime() throws Exception { System.out.println("number of executions: " + counter.get()); assertEquals(3, counter.get()); - assertEquals(3, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -157,7 +157,7 @@ public void testUnsubscribeOnOneDoesntKillBatch() throws Exception { assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -176,7 +176,7 @@ public void testShardedRequests() throws Exception { /* we should get 2 batches since it gets sharded */ assertEquals(2, counter.get()); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -200,7 +200,7 @@ public void testRequestScope() throws Exception { // 2 different batches should execute, 1 per request assertEquals(2, counter.get()); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -224,7 +224,7 @@ public void testGlobalScope() throws Exception { // despite having cleared the cache in between we should have a single execution because this is on the global not request cache assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -248,7 +248,7 @@ public void testErrorHandlingViaFutureException() throws Exception { } assertEquals(0, counter.get()); - assertEquals(0, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(0, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -274,7 +274,7 @@ public void testErrorHandlingWhenMapToResponseFails() throws Exception { // the batch failed so no executions assertEquals(0, counter.get()); // but it still executed the command once - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -441,9 +441,9 @@ public void testRequestCache1() { // we should still have executed only one command assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); - HystrixCommand command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand[1])[0]; + HystrixExecutableInfo command = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixExecutableInfo[1])[0]; System.out.println("command.getExecutionEvents(): " + command.getExecutionEvents()); assertEquals(2, command.getExecutionEvents().size()); assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); @@ -493,9 +493,9 @@ public void testRequestCache2() { // we should still have executed only one command assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); - HystrixCommand command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand[1])[0]; + HystrixExecutableInfo command = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixExecutableInfo[1])[0]; assertEquals(2, command.getExecutionEvents().size()); assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); @@ -549,9 +549,9 @@ public void testRequestCache3() { // we should still have executed only one command assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); - HystrixCommand command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand[1])[0]; + HystrixExecutableInfo command = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixExecutableInfo[1])[0]; assertEquals(2, command.getExecutionEvents().size()); assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS)); assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); @@ -605,16 +605,16 @@ public void testNoRequestCache3() { // request caching is turned off on this so we expect 2 command executions assertEquals(2, counter.get()); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); // we expect to see it with SUCCESS and COLLAPSED and both - HystrixCommand commandA = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand[2])[0]; + HystrixExecutableInfo commandA = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixExecutableInfo[2])[0]; assertEquals(2, commandA.getExecutionEvents().size()); assertTrue(commandA.getExecutionEvents().contains(HystrixEventType.SUCCESS)); assertTrue(commandA.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); // we expect to see it with SUCCESS and COLLAPSED and both - HystrixCommand commandB = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand[2])[1]; + HystrixExecutableInfo commandB = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixExecutableInfo[2])[1]; assertEquals(2, commandB.getExecutionEvents().size()); assertTrue(commandB.getExecutionEvents().contains(HystrixEventType.SUCCESS)); assertTrue(commandB.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); @@ -675,9 +675,9 @@ public void testRequestCacheWithException() { // it should still be 1 ... no new executions assertEquals(1, commands.size()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); - HystrixCommand command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand[1])[0]; + HystrixExecutableInfo command = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixExecutableInfo[1])[0]; assertEquals(2, command.getExecutionEvents().size()); assertTrue(command.getExecutionEvents().contains(HystrixEventType.FAILURE)); assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED)); @@ -737,7 +737,7 @@ public void testRequestCacheWithTimeout() { // it should still be 1 ... no new executions assertEquals(1, commands.size()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -767,7 +767,7 @@ public void testRequestWithCommandShortCircuited() throws Exception { assertEquals(0, counter.get()); // it will execute once (short-circuited) - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -789,7 +789,7 @@ public void testVoidResponseTypeFireAndForgetCollapsing1() throws Exception { assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -815,7 +815,7 @@ public void testVoidResponseTypeFireAndForgetCollapsing2() throws Exception { assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -830,7 +830,7 @@ public void testVoidResponseTypeFireAndForgetCollapsing3() throws Exception { assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } private static class TestRequestCollapser extends HystrixCollapser, String, String> { diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java index cc2bb2fac..c439bbd07 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java @@ -106,7 +106,7 @@ public void testExecutionSuccess() { assertEquals(0, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } catch (Exception e) { e.printStackTrace(); @@ -185,7 +185,7 @@ public void testExecutionKnownFailureWithNoFallback() { assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -224,7 +224,7 @@ public void testExecutionUnknownFailureWithNoFallback() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -259,7 +259,7 @@ public void testExecutionFailureWithFallback() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -295,7 +295,7 @@ public void testExecutionFailureWithFallbackFailure() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -329,7 +329,7 @@ public void testQueueSuccess() { assertEquals(0, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -371,7 +371,7 @@ public void testQueueKnownFailureWithNoFallback() { assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -411,7 +411,7 @@ public void testQueueUnknownFailureWithNoFallback() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -445,7 +445,7 @@ public void testQueueFailureWithFallback() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -484,7 +484,7 @@ public void testQueueFailureWithFallbackFailure() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -497,7 +497,7 @@ public void testObserveSuccess() { assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.FAILURE)); assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT)); assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS)); - assertEquals(true, command.observe().toBlockingObservable().single()); + assertEquals(true, command.observe().toBlocking().single()); assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.FAILURE)); assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT)); assertEquals(1, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS)); @@ -518,7 +518,7 @@ public void testObserveSuccess() { assertEquals(0, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } catch (Exception e) { e.printStackTrace(); @@ -749,7 +749,7 @@ public void testCircuitBreakerTripsAfterFailures() { assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(4, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(4, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -805,7 +805,7 @@ public void testCircuitBreakerTripsAfterFailuresViaQueue() { assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(4, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(4, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } catch (Exception e) { e.printStackTrace(); fail("We should have received fallbacks."); @@ -876,7 +876,7 @@ public void testCircuitBreakerAcrossMultipleCommandsButSameCircuitBreaker() { assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(4, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(4, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -953,7 +953,7 @@ public void testCircuitBreakerAcrossMultipleCommandsAndDifferentDependency() { assertEquals(100, circuitBreaker_two.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(4, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(4, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -984,7 +984,7 @@ public void testExecutionSuccessWithCircuitBreakerDisabled() { assertEquals(0, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1030,7 +1030,7 @@ public void testExecutionTimeoutWithNoFallback() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1064,7 +1064,7 @@ public void testExecutionTimeoutWithFallback() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1104,7 +1104,7 @@ public void testExecutionTimeoutFallbackFailure() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1149,7 +1149,7 @@ public void testCircuitBreakerOnExecutionTimeout() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1199,7 +1199,7 @@ public void testCountersOnExecutionTimeout() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1245,7 +1245,7 @@ public void testQueuedExecutionTimeoutWithNoFallback() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1278,7 +1278,7 @@ public void testQueuedExecutionTimeoutWithFallback() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1320,7 +1320,7 @@ public void testQueuedExecutionTimeoutFallbackFailure() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1333,7 +1333,7 @@ public void testQueuedExecutionTimeoutFallbackFailure() { public void testObservedExecutionTimeoutWithNoFallback() { TestHystrixCommand command = new TestCommandWithTimeout(50, TestCommandWithTimeout.FALLBACK_NOT_IMPLEMENTED); try { - command.observe().toBlockingObservable().single(); + command.observe().toBlocking().single(); fail("we shouldn't get here"); } catch (Exception e) { e.printStackTrace(); @@ -1366,7 +1366,7 @@ public void testObservedExecutionTimeoutWithNoFallback() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1379,7 +1379,7 @@ public void testObservedExecutionTimeoutWithNoFallback() { public void testObservedExecutionTimeoutWithFallback() { TestHystrixCommand command = new TestCommandWithTimeout(50, TestCommandWithTimeout.FALLBACK_SUCCESS); try { - assertEquals(false, command.observe().toBlockingObservable().single()); + assertEquals(false, command.observe().toBlocking().single()); } catch (Exception e) { e.printStackTrace(); fail("We should have received a response from the fallback."); @@ -1399,7 +1399,7 @@ public void testObservedExecutionTimeoutWithFallback() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1412,7 +1412,7 @@ public void testObservedExecutionTimeoutWithFallback() { public void testObservedExecutionTimeoutFallbackFailure() { TestHystrixCommand command = new TestCommandWithTimeout(50, TestCommandWithTimeout.FALLBACK_FAILURE); try { - command.observe().toBlockingObservable().single(); + command.observe().toBlocking().single(); fail("we shouldn't get here"); } catch (Exception e) { if (e instanceof HystrixRuntimeException) { @@ -1441,7 +1441,7 @@ public void testObservedExecutionTimeoutFallbackFailure() { assertEquals(100, command.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1488,7 +1488,7 @@ public void testShortCircuitFallbackCounter() { assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1568,7 +1568,7 @@ public void run() { assertEquals(50, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1627,7 +1627,7 @@ public void run() { assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1687,7 +1687,7 @@ public void run() { assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1759,7 +1759,7 @@ public void run() { assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -1837,7 +1837,7 @@ public void testTimedOutCommandDoesNotExecute() { assertEquals(100, s2.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -1897,7 +1897,7 @@ public void testFallbackSemaphore() { assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT)); assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE)); - assertEquals(3, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -1965,7 +1965,7 @@ public void run() { assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT)); assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE)); - assertEquals(3, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -2042,7 +2042,7 @@ public void run() { assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT)); assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE)); - assertEquals(3, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -2105,7 +2105,7 @@ public void run() { System.out.println("**** DONE"); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -2317,7 +2317,7 @@ public void testRequestCache1() { assertEquals(0, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -2369,7 +2369,7 @@ public void testRequestCache2() { assertEquals(0, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -2430,7 +2430,7 @@ public void testRequestCache3() { assertEquals(0, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(3, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -2497,7 +2497,7 @@ public void testRequestCacheWithSlowExecution() { assertEquals(0, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(4, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(4, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); System.out.println("HystrixRequestLog: " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString()); } @@ -2558,7 +2558,7 @@ public void testNoRequestCache3() { assertEquals(0, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(3, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -2621,7 +2621,7 @@ public void testRequestCacheViaQueueSemaphore1() { assertEquals(0, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(3, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -2680,7 +2680,7 @@ public void testNoRequestCacheViaQueueSemaphore1() { assertEquals(0, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(3, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -2735,7 +2735,7 @@ public void testRequestCacheViaExecuteSemaphore1() { assertEquals(0, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(3, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -2790,7 +2790,7 @@ public void testNoRequestCacheViaExecuteSemaphore1() { assertEquals(0, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(3, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(3, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -2855,7 +2855,7 @@ public void testNoRequestCacheOnTimeoutThrowsException() throws Exception { assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(4, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(4, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -2893,9 +2893,9 @@ public void testRequestCacheOnTimeoutCausesNullPointerException() throws Excepti assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(5, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(5, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); - HystrixCommand[] executeCommands = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand[] {}); + HystrixExecutableInfo[] executeCommands = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().toArray(new HystrixExecutableInfo[] {}); System.out.println(":executeCommands[0].getExecutionEvents()" + executeCommands[0].getExecutionEvents()); assertEquals(2, executeCommands[0].getExecutionEvents().size()); @@ -2976,7 +2976,7 @@ public void testRequestCacheOnTimeoutThrowsException() throws Exception { assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(4, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(4, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -3045,7 +3045,7 @@ public void testRequestCacheOnThreadRejectionThrowsException() throws Exception assertEquals(100, circuitBreaker.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(4, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(4, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } /** @@ -4200,7 +4200,7 @@ public void testExecutionFailureWithFallbackImplementedButDisabled() { assertEquals(100, commandDisabled.builder.metrics.getHealthCounts().getErrorPercentage()); - assertEquals(2, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(2, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } @Test @@ -5201,7 +5201,7 @@ private static class TestExecutionHook extends HystrixCommandExecutionHook { AtomicInteger startExecute = new AtomicInteger(); @Override - public void onStart(HystrixCommand commandInstance) { + public void onStart(HystrixExecutable commandInstance) { super.onStart(commandInstance); startExecute.incrementAndGet(); } @@ -5209,7 +5209,7 @@ public void onStart(HystrixCommand commandInstance) { Object endExecuteSuccessResponse = null; @Override - public T onComplete(HystrixCommand commandInstance, T response) { + public T onComplete(HystrixExecutable commandInstance, T response) { endExecuteSuccessResponse = response; return super.onComplete(commandInstance, response); } @@ -5218,7 +5218,7 @@ public T onComplete(HystrixCommand commandInstance, T response) { FailureType endExecuteFailureType = null; @Override - public Exception onError(HystrixCommand commandInstance, FailureType failureType, Exception e) { + public Exception onError(HystrixExecutable commandInstance, FailureType failureType, Exception e) { endExecuteFailureException = e; endExecuteFailureType = failureType; return super.onError(commandInstance, failureType, e); @@ -5227,7 +5227,7 @@ public Exception onError(HystrixCommand commandInstance, FailureType fail AtomicInteger startRun = new AtomicInteger(); @Override - public void onRunStart(HystrixCommand commandInstance) { + public void onRunStart(HystrixExecutable commandInstance) { super.onRunStart(commandInstance); startRun.incrementAndGet(); } @@ -5235,7 +5235,7 @@ public void onRunStart(HystrixCommand commandInstance) { Object runSuccessResponse = null; @Override - public T onRunSuccess(HystrixCommand commandInstance, T response) { + public T onRunSuccess(HystrixExecutable commandInstance, T response) { runSuccessResponse = response; return super.onRunSuccess(commandInstance, response); } @@ -5243,7 +5243,7 @@ public T onRunSuccess(HystrixCommand commandInstance, T response) { Exception runFailureException = null; @Override - public Exception onRunError(HystrixCommand commandInstance, Exception e) { + public Exception onRunError(HystrixExecutable commandInstance, Exception e) { runFailureException = e; return super.onRunError(commandInstance, e); } @@ -5251,7 +5251,7 @@ public Exception onRunError(HystrixCommand commandInstance, Exception e) AtomicInteger startFallback = new AtomicInteger(); @Override - public void onFallbackStart(HystrixCommand commandInstance) { + public void onFallbackStart(HystrixExecutable commandInstance) { super.onFallbackStart(commandInstance); startFallback.incrementAndGet(); } @@ -5259,7 +5259,7 @@ public void onFallbackStart(HystrixCommand commandInstance) { Object fallbackSuccessResponse = null; @Override - public T onFallbackSuccess(HystrixCommand commandInstance, T response) { + public T onFallbackSuccess(HystrixExecutable commandInstance, T response) { fallbackSuccessResponse = response; return super.onFallbackSuccess(commandInstance, response); } @@ -5267,7 +5267,7 @@ public T onFallbackSuccess(HystrixCommand commandInstance, T response) { Exception fallbackFailureException = null; @Override - public Exception onFallbackError(HystrixCommand commandInstance, Exception e) { + public Exception onFallbackError(HystrixExecutable commandInstance, Exception e) { fallbackFailureException = e; return super.onFallbackError(commandInstance, e); } @@ -5275,7 +5275,7 @@ public Exception onFallbackError(HystrixCommand commandInstance, Exceptio AtomicInteger threadStart = new AtomicInteger(); @Override - public void onThreadStart(HystrixCommand commandInstance) { + public void onThreadStart(HystrixExecutable commandInstance) { super.onThreadStart(commandInstance); threadStart.incrementAndGet(); } @@ -5283,7 +5283,7 @@ public void onThreadStart(HystrixCommand commandInstance) { AtomicInteger threadComplete = new AtomicInteger(); @Override - public void onThreadComplete(HystrixCommand commandInstance) { + public void onThreadComplete(HystrixExecutable commandInstance) { super.onThreadComplete(commandInstance); threadComplete.incrementAndGet(); } diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java index 37a007418..c74cb5ffc 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java @@ -96,7 +96,7 @@ public void call(Subscriber sub) { sub.onError(e); } } - }).subscribeOn(Schedulers.computation()).toBlockingObservable().toFuture()); + }).subscribeOn(Schedulers.computation()).toBlocking().toFuture()); } for (Future future : futures) { try { @@ -571,7 +571,7 @@ public void testObserveSuccess() { assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT)); assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS)); - assertEquals(true, command.observe().toBlockingObservable().single()); + assertEquals(true, command.observe().toBlocking().single()); assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.FAILURE)); assertEquals(0, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT)); assertEquals(1, command.builder.metrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS)); @@ -746,18 +746,6 @@ public void testObserveOnImmediateSchedulerByDefaultForSemaphoreIsolation() thro @Override protected Observable run() { commandThread.set(Thread.currentThread()); - - Func1> f1 = new Func1>() { - - @Override - public List call(Integer t1) { - - return null; - } - - }; - Observable.from(f1); - return Observable.just(true); } }; @@ -1595,7 +1583,7 @@ public void testQueuedExecutionTimeoutFallbackFailure() { public void testObservedExecutionTimeoutWithNoFallback() { TestHystrixCommand command = new TestCommandWithTimeout(50, TestCommandWithTimeout.FALLBACK_NOT_IMPLEMENTED); try { - command.observe().toBlockingObservable().single(); + command.observe().toBlocking().single(); fail("we shouldn't get here"); } catch (Exception e) { e.printStackTrace(); @@ -1644,7 +1632,7 @@ public void testObservedExecutionTimeoutWithNoFallback() { public void testObservedExecutionTimeoutWithFallback() { TestHystrixCommand command = new TestCommandWithTimeout(50, TestCommandWithTimeout.FALLBACK_SUCCESS); try { - assertEquals(false, command.observe().toBlockingObservable().single()); + assertEquals(false, command.observe().toBlocking().single()); } catch (Exception e) { e.printStackTrace(); fail("We should have received a response from the fallback."); @@ -1680,7 +1668,7 @@ public void testObservedExecutionTimeoutWithFallback() { public void testObservedExecutionTimeoutFallbackFailure() { TestHystrixCommand command = new TestCommandWithTimeout(50, TestCommandWithTimeout.FALLBACK_FAILURE); try { - command.observe().toBlockingObservable().single(); + command.observe().toBlocking().single(); fail("we shouldn't get here"); } catch (Exception e) { if (e instanceof HystrixRuntimeException) { @@ -4194,9 +4182,9 @@ public void call(Subscriber t1) { @Override protected Observable getFallback() { if (isResponseTimedOut()) { - return Observable.from("timed-out"); + return Observable.just("timed-out"); } else { - return Observable.from("abc"); + return Observable.just("abc"); } } }; @@ -4241,14 +4229,14 @@ public void call(Subscriber t1) { @Override protected Observable getFallback() { if (isResponseTimedOut()) { - return Observable.from("timed-out"); + return Observable.just("timed-out"); } else { - return Observable.from("abc"); + return Observable.just("abc"); } } }; - String value = command.observe().toBlockingObservable().last(); + String value = command.observe().toBlocking().last(); assertTrue(command.isResponseTimedOut()); assertEquals("expected fallback value", "timed-out", value); @@ -4286,14 +4274,14 @@ public void call(Subscriber t1) { @Override protected Observable getFallback() { if (isResponseTimedOut()) { - return Observable.from("timed-out"); + return Observable.just("timed-out"); } else { - return Observable.from("abc"); + return Observable.just("abc"); } } }; - String value = command.observe().toBlockingObservable().last(); + String value = command.observe().toBlocking().last(); assertTrue(command.isResponseTimedOut()); assertEquals("expected fallback value", "timed-out", value); @@ -4448,7 +4436,7 @@ public Boolean call(Boolean b) { return b; } - }).toBlockingObservable().single()); + }).toBlocking().single()); } catch (Exception e) { e.printStackTrace(); exceptionReceived.set(true); @@ -4547,7 +4535,7 @@ public void call() { } } - }).toBlockingObservable().single()); + }).toBlocking().single()); } catch (Exception e) { e.printStackTrace(); exceptionReceived.set(true); @@ -6350,7 +6338,7 @@ protected Observable run() { @Override protected Observable getFallback() { - return Observable.from(false).subscribeOn(Schedulers.computation()); + return Observable.just(false).subscribeOn(Schedulers.computation()); } } @@ -6396,7 +6384,7 @@ public SuccessfulCacheableCommand(TestCircuitBreaker circuitBreaker, boolean cac protected Observable run() { executed = true; System.out.println("successfully executed"); - return Observable.from(value).subscribeOn(Schedulers.computation()); + return Observable.just(value).subscribeOn(Schedulers.computation()); } public boolean isCommandRunningInThread() { @@ -6432,7 +6420,7 @@ public SuccessfulCacheableCommandViaSemaphore(TestCircuitBreaker circuitBreaker, protected Observable run() { executed = true; System.out.println("successfully executed"); - return Observable.from(value).subscribeOn(Schedulers.computation()); + return Observable.just(value).subscribeOn(Schedulers.computation()); } public boolean isCommandRunningInThread() { @@ -6468,7 +6456,7 @@ public SlowCacheableCommand(TestCircuitBreaker circuitBreaker, String value, int @Override protected Observable run() { executed = true; - return Observable.from(value).delay(duration, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation()) + return Observable.just(value).delay(duration, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation()) .doOnNext(new Action1() { @Override @@ -6497,7 +6485,7 @@ private TestCommandWithoutCircuitBreaker() { @Override protected Observable run() { System.out.println("successfully executed"); - return Observable.from(true).subscribeOn(Schedulers.computation()); + return Observable.just(true).subscribeOn(Schedulers.computation()); } } @@ -6554,7 +6542,7 @@ public void call(Subscriber sub) { @Override protected Observable getFallback() { if (fallbackBehavior == FALLBACK_SUCCESS) { - return Observable.from(false); + return Observable.just(false); } else if (fallbackBehavior == FALLBACK_FAILURE) { // TODO duplicate with error inside async Observable throw new RuntimeException("failed on fallback"); @@ -6742,7 +6730,7 @@ private TestSemaphoreCommandWithFallback(TestCircuitBreaker circuitBreaker, int super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics) .setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter().withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE).withExecutionIsolationSemaphoreMaxConcurrentRequests(executionSemaphoreCount))); this.executionSleep = executionSleep; - this.fallback = Observable.from(fallback); + this.fallback = Observable.just(fallback); } @Override @@ -6799,7 +6787,7 @@ public void call(Subscriber s) { @Override protected Observable getFallback() { - return Observable.from(false).subscribeOn(Schedulers.computation()); + return Observable.just(false).subscribeOn(Schedulers.computation()); } @Override @@ -6891,7 +6879,7 @@ protected Observable run() { } catch (InterruptedException e) { throw new RuntimeException(e); } - return Observable.from(true); + return Observable.just(true); } @Override @@ -6915,7 +6903,7 @@ protected Observable run() { @Override protected Observable getFallback() { - return Observable.from(false).subscribeOn(Schedulers.computation()); + return Observable.just(false).subscribeOn(Schedulers.computation()); } @Override diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixRequestCacheTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixRequestCacheTest.java index fa3fc157c..4e5b17585 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixRequestCacheTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixRequestCacheTest.java @@ -26,10 +26,10 @@ public void testCache() { HystrixRequestCache cache2 = HystrixRequestCache.getInstance(HystrixCommandKey.Factory.asKey("command2"), strategy); cache2.putIfAbsent("valueA", new TestObservable("a3")); - assertEquals("a1", cache1.get("valueA").toBlockingObservable().last()); - assertEquals("b1", cache1.get("valueB").toBlockingObservable().last()); + assertEquals("a1", cache1.get("valueA").toBlocking().last()); + assertEquals("b1", cache1.get("valueB").toBlocking().last()); - assertEquals("a3", cache2.get("valueA").toBlockingObservable().last()); + assertEquals("a3", cache2.get("valueA").toBlocking().last()); assertNull(cache2.get("valueB")); } catch (Exception e) { fail("Exception: " + e.getMessage()); @@ -56,7 +56,7 @@ public void testClearCache() { try { HystrixRequestCache cache1 = HystrixRequestCache.getInstance(HystrixCommandKey.Factory.asKey("command1"), strategy); cache1.putIfAbsent("valueA", new TestObservable("a1")); - assertEquals("a1", cache1.get("valueA").toBlockingObservable().last()); + assertEquals("a1", cache1.get("valueA").toBlocking().last()); cache1.clear("valueA"); assertNull(cache1.get("valueA")); } catch (Exception e) { diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixRequestLogTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixRequestLogTest.java index 3763ba770..e744a7418 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixRequestLogTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixRequestLogTest.java @@ -137,7 +137,7 @@ public void testMaxLimit() { new TestCommand("A", false, true).execute(); } - assertEquals(HystrixRequestLog.MAX_STORAGE, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size()); + assertEquals(HystrixRequestLog.MAX_STORAGE, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); } finally { context.shutdown(); } diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/collapser/CollapsedRequestObservableFunctionTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/collapser/CollapsedRequestObservableFunctionTest.java index daad0ba5a..6710dab3b 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/collapser/CollapsedRequestObservableFunctionTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/collapser/CollapsedRequestObservableFunctionTest.java @@ -15,7 +15,7 @@ public class CollapsedRequestObservableFunctionTest { public void testSetResponseSuccess() throws InterruptedException, ExecutionException { CollapsedRequestObservableFunction cr = new CollapsedRequestObservableFunction("hello"); Observable o = Observable.create(cr); - Future v = o.toBlockingObservable().toFuture(); + Future v = o.toBlocking().toFuture(); cr.setResponse("theResponse"); @@ -27,7 +27,7 @@ public void testSetResponseSuccess() throws InterruptedException, ExecutionExcep public void testSetNullResponseSuccess() throws InterruptedException, ExecutionException { CollapsedRequestObservableFunction cr = new CollapsedRequestObservableFunction("hello"); Observable o = Observable.create(cr); - Future v = o.toBlockingObservable().toFuture(); + Future v = o.toBlocking().toFuture(); cr.setResponse(null); @@ -39,7 +39,7 @@ public void testSetNullResponseSuccess() throws InterruptedException, ExecutionE public void testSetException() throws InterruptedException, ExecutionException { CollapsedRequestObservableFunction cr = new CollapsedRequestObservableFunction("hello"); Observable o = Observable.create(cr); - Future v = o.toBlockingObservable().toFuture(); + Future v = o.toBlocking().toFuture(); cr.setException(new RuntimeException("anException")); @@ -56,7 +56,7 @@ public void testSetException() throws InterruptedException, ExecutionException { public void testSetExceptionAfterResponse() throws InterruptedException, ExecutionException { CollapsedRequestObservableFunction cr = new CollapsedRequestObservableFunction("hello"); Observable o = Observable.create(cr); - Future v = o.toBlockingObservable().toFuture(); + Future v = o.toBlocking().toFuture(); cr.setResponse("theResponse"); @@ -74,7 +74,7 @@ public void testSetExceptionAfterResponse() throws InterruptedException, Executi public void testSetResponseAfterException() throws InterruptedException, ExecutionException { CollapsedRequestObservableFunction cr = new CollapsedRequestObservableFunction("hello"); Observable o = Observable.create(cr); - Future v = o.toBlockingObservable().toFuture(); + Future v = o.toBlocking().toFuture(); cr.setException(new RuntimeException("anException")); @@ -97,7 +97,7 @@ public void testSetResponseAfterException() throws InterruptedException, Executi public void testSetResponseDuplicate() throws InterruptedException, ExecutionException { CollapsedRequestObservableFunction cr = new CollapsedRequestObservableFunction("hello"); Observable o = Observable.create(cr); - Future v = o.toBlockingObservable().toFuture(); + Future v = o.toBlocking().toFuture(); cr.setResponse("theResponse"); @@ -115,7 +115,7 @@ public void testSetResponseDuplicate() throws InterruptedException, ExecutionExc public void testSetResponseAfterUnsubscribe() throws InterruptedException, ExecutionException { CollapsedRequestObservableFunction cr = new CollapsedRequestObservableFunction("hello"); Observable o = Observable.create(cr); - Future f = o.toBlockingObservable().toFuture(); + Future f = o.toBlocking().toFuture(); // cancel/unsubscribe f.cancel(true); @@ -134,7 +134,7 @@ public void testSetResponseAfterUnsubscribe() throws InterruptedException, Execu public void testSetExceptionAfterUnsubscribe() throws InterruptedException, ExecutionException { CollapsedRequestObservableFunction cr = new CollapsedRequestObservableFunction("hello"); Observable o = Observable.create(cr); - Future f = o.toBlockingObservable().toFuture(); + Future f = o.toBlocking().toFuture(); // cancel/unsubscribe f.cancel(true); @@ -153,7 +153,7 @@ public void testSetExceptionAfterUnsubscribe() throws InterruptedException, Exec public void testUnsubscribeAfterSetResponse() throws InterruptedException, ExecutionException { CollapsedRequestObservableFunction cr = new CollapsedRequestObservableFunction("hello"); Observable o = Observable.create(cr); - Future v = o.toBlockingObservable().toFuture(); + Future v = o.toBlocking().toFuture(); cr.setResponse("theResponse"); diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/strategy/HystrixPluginsTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/strategy/HystrixPluginsTest.java index a6947d4d2..0acaed4aa 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/strategy/HystrixPluginsTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/strategy/HystrixPluginsTest.java @@ -196,7 +196,7 @@ public void call(Throwable throwable) { } }) .materialize() - .toBlockingObservable().single(); + .toBlocking().single(); context.shutdown(); Hystrix.reset(); diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategyTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategyTest.java index f00cdb4ba..e0b295a83 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategyTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/strategy/concurrency/HystrixConcurrencyStrategyTest.java @@ -47,14 +47,14 @@ public void testRequestContextPropagatesAcrossObserveOnPool() { @Override public String call(String s) { - System.out.println("Map => Commands: " + HystrixRequestLog.getCurrentRequest().getExecutedCommands()); + System.out.println("Map => Commands: " + HystrixRequestLog.getCurrentRequest().getAllExecutedCommands()); return s; } - }).toBlockingObservable().forEach(new Action1() { + }).toBlocking().forEach(new Action1() { @Override public void call(String s) { - System.out.println("Result [" + s + "] => Commands: " + HystrixRequestLog.getCurrentRequest().getExecutedCommands()); + System.out.println("Result [" + s + "] => Commands: " + HystrixRequestLog.getCurrentRequest().getAllExecutedCommands()); } }); } @@ -84,7 +84,7 @@ public void call(Throwable throwable) { } }) .materialize() - .toBlockingObservable().single(); + .toBlocking().single(); System.out.println("initialized = " + HystrixRequestContext.isCurrentThreadInitialized()); System.out.println("initialized inside onError = " + isInitialized.get());