Skip to content

Commit

Permalink
CAMEL-14182: Make Hystrix EIP general as Circuit Breaker EIP and allo…
Browse files Browse the repository at this point in the history
…w to plugin other implementations (#3342)

CAMEL-14182: Make Hystrix EIP general as Circuit Breaker EIP and allow to plugin other implementations.
  • Loading branch information
davsclaus authored Nov 15, 2019
1 parent 871e5ed commit 9481876
Show file tree
Hide file tree
Showing 42 changed files with 352 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@

import org.apache.camel.Processor;
import org.apache.camel.impl.engine.TypedProcessorFactory;
import org.apache.camel.model.HystrixDefinition;
import org.apache.camel.model.CircuitBreakerDefinition;
import org.apache.camel.spi.RouteContext;

/**
* To integrate camel-hystrix with the Camel routes using the Hystrix EIP.
*/
public class HystrixProcessorFactory extends TypedProcessorFactory<HystrixDefinition> {
public class HystrixProcessorFactory extends TypedProcessorFactory<CircuitBreakerDefinition> {

public HystrixProcessorFactory() {
super(HystrixDefinition.class);
super(CircuitBreakerDefinition.class);
}

@Override
public Processor doCreateProcessor(RouteContext routeContext, HystrixDefinition definition) throws Exception {
public Processor doCreateProcessor(RouteContext routeContext, CircuitBreakerDefinition definition) throws Exception {
return new HystrixReifier(definition).createProcessor(routeContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.camel.CamelContext;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
import org.apache.camel.model.CircuitBreakerDefinition;
import org.apache.camel.model.HystrixConfigurationDefinition;
import org.apache.camel.model.HystrixDefinition;
import org.apache.camel.model.Model;
import org.apache.camel.reifier.ProcessorReifier;
import org.apache.camel.spi.BeanIntrospection;
Expand All @@ -41,9 +41,9 @@
import static org.apache.camel.support.CamelContextHelper.lookup;
import static org.apache.camel.support.CamelContextHelper.mandatoryLookup;

public class HystrixReifier extends ProcessorReifier<HystrixDefinition> {
public class HystrixReifier extends ProcessorReifier<CircuitBreakerDefinition> {

public HystrixReifier(HystrixDefinition definition) {
public HystrixReifier(CircuitBreakerDefinition definition) {
super(definition);
}

Expand Down Expand Up @@ -223,8 +223,8 @@ HystrixConfigurationDefinition buildHystrixConfiguration(CamelContext camelConte

// Extract properties from referenced configuration, the one configured
// on camel context takes the precedence over those in the registry
if (definition.getHystrixConfigurationRef() != null) {
final String ref = definition.getHystrixConfigurationRef();
if (definition.getConfigurationRef() != null) {
final String ref = definition.getConfigurationRef();

loadProperties(camelContext, properties, Suppliers.firstNotNull(
() -> camelContext.getExtension(Model.class).getHystrixConfiguration(ref),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected RouteBuilder createRouteBuilder() throws Exception {
public void configure() throws Exception {
from("direct:start")
.to("log:start")
.hystrix()
.circuitBreaker()
.throwException(new HystrixBadRequestException("Should not fallback"))
.onFallback()
.to("mock:fallback")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,26 @@ class HystrixExceptionRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:start")
.hystrix()
.hystrixConfiguration()
.executionTimeoutInMilliseconds(100)
.circuitBreakerRequestVolumeThreshold(REQUEST_VOLUME_THRESHOLD)
.metricsRollingStatisticalWindowInMilliseconds(1000)
.circuitBreakerSleepWindowInMilliseconds(2000)
.end()
.log("Hystrix processing start: ${threadName}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
if (throwException) {
LOG.info("Will throw exception");
throw new IOException("Route has failed");
} else {
LOG.info("Will NOT throw exception");
.circuitBreaker()
.hystrixConfiguration()
.executionTimeoutInMilliseconds(100)
.circuitBreakerRequestVolumeThreshold(REQUEST_VOLUME_THRESHOLD)
.metricsRollingStatisticalWindowInMilliseconds(1000)
.circuitBreakerSleepWindowInMilliseconds(2000)
.end()
.log("Hystrix processing start: ${threadName}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
if (throwException) {
LOG.info("Will throw exception");
throw new IOException("Route has failed");
} else {
LOG.info("Will NOT throw exception");
}
}
}
})
.log("Hystrix processing end: ${threadName}")
})
.log("Hystrix processing end: ${threadName}")
.end()
.log(HYSTRIX_RESPONSE_SHORT_CIRCUITED + " = ${exchangeProperty." + HYSTRIX_RESPONSE_SHORT_CIRCUITED + "}")
.to("mock:result");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.CircuitBreakerDefinition;
import org.apache.camel.model.HystrixConfigurationDefinition;
import org.apache.camel.model.HystrixDefinition;
import org.apache.camel.model.Model;
import org.apache.camel.support.SimpleRegistry;
import org.junit.Assert;
Expand All @@ -45,8 +45,8 @@ public void testRegistryConfiguration() throws Exception {
registry.bind("ref-hystrix", ref);

final HystrixReifier reifier = new HystrixReifier(
new HystrixDefinition()
.hystrixConfiguration("ref-hystrix")
new CircuitBreakerDefinition()
.configuration("ref-hystrix")
.hystrixConfiguration()
.groupKey("local-conf-group-key")
.requestLogEnabled(false)
Expand All @@ -56,7 +56,7 @@ public void testRegistryConfiguration() throws Exception {

Assert.assertEquals("local-conf-group-key", config.getGroupKey());
Assert.assertEquals("global-thread-key", config.getThreadPoolKey());
Assert.assertEquals(new Integer(5), config.getCorePoolSize());
Assert.assertEquals(Integer.valueOf(5), config.getCorePoolSize());
}

@Test
Expand All @@ -76,8 +76,8 @@ public void testContextConfiguration() throws Exception {
context.getExtension(Model.class).addHystrixConfiguration("ref-hystrix", ref);

final HystrixReifier reifier = new HystrixReifier(
new HystrixDefinition()
.hystrixConfiguration("ref-hystrix")
new CircuitBreakerDefinition()
.configuration("ref-hystrix")
.hystrixConfiguration()
.groupKey("local-conf-group-key")
.requestLogEnabled(false)
Expand All @@ -87,7 +87,7 @@ public void testContextConfiguration() throws Exception {

Assert.assertEquals("local-conf-group-key", config.getGroupKey());
Assert.assertEquals("global-thread-key", config.getThreadPoolKey());
Assert.assertEquals(new Integer(5), config.getCorePoolSize());
Assert.assertEquals(Integer.valueOf(5), config.getCorePoolSize());
}

@Test
Expand Down Expand Up @@ -116,8 +116,8 @@ public void testMixedConfiguration() throws Exception {
registry.bind("ref-hystrix", ref);

final HystrixReifier reifier = new HystrixReifier(
new HystrixDefinition()
.hystrixConfiguration("ref-hystrix")
new CircuitBreakerDefinition()
.configuration("ref-hystrix")
.hystrixConfiguration()
.groupKey("local-conf-group-key")
.requestLogEnabled(false)
Expand All @@ -127,6 +127,6 @@ public void testMixedConfiguration() throws Exception {

Assert.assertEquals("local-conf-group-key", config.getGroupKey());
Assert.assertEquals("global-thread-key", config.getThreadPoolKey());
Assert.assertEquals(new Integer(5), config.getCorePoolSize());
Assert.assertEquals(Integer.valueOf(5), config.getCorePoolSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void configure() throws Exception {
from("direct:start")
.to("log:start")
// turn on Camel's error handler on hystrix so it can do redeliveries
.hystrix().inheritErrorHandler(true)
.circuitBreaker().inheritErrorHandler(true)
.to("mock:a")
.throwException(new IllegalArgumentException("Forced"))
.end()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ public void configure() throws Exception {
context.addService(stream);

from("direct:start").routeId("start")
.hystrix().id("myHystrix")
.to("direct:foo")
.circuitBreaker().id("myHystrix")
.to("direct:foo")
.onFallback()
.transform().constant("Fallback message")
.transform().constant("Fallback message")
.end()
.to("mock:result");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.camel.component.hystrix.processor;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.CircuitBreakerDefinition;
import org.apache.camel.model.HystrixConfigurationDefinition;
import org.apache.camel.model.HystrixDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
Expand All @@ -41,11 +41,8 @@ public void testGroupKeyAndThreadPoolKeyConfigFlagsDoNotScrapHystrixConfiguratio
@Override
public void configure() throws Exception {
from("direct:foo")
.hystrix()
.hystrixConfiguration().groupKey("test1").metricsHealthSnapshotIntervalInMilliseconds(99999).end()
.groupKey("test2")
// ^^^ should only override the groupKey from the HystrixConfigurationDefinition;
// it should not discard the full HystrixConfigurationDefinition.
.circuitBreaker()
.hystrixConfiguration().groupKey("test2").metricsHealthSnapshotIntervalInMilliseconds(99999).end()
.to("log:hello")
.end();

Expand All @@ -55,9 +52,9 @@ public void configure() throws Exception {
rb.configure();

RouteDefinition route = rb.getRouteCollection().getRoutes().get(0);
assertEquals(HystrixDefinition.class, route.getOutputs().get(0).getClass());
assertEquals(CircuitBreakerDefinition.class, route.getOutputs().get(0).getClass());

HystrixConfigurationDefinition config = ((HystrixDefinition) route.getOutputs().get(0)).getHystrixConfiguration();
HystrixConfigurationDefinition config = ((CircuitBreakerDefinition) route.getOutputs().get(0)).getHystrixConfiguration();
assertEquals("test2", config.getGroupKey());
assertEquals(99999, config.getMetricsHealthSnapshotIntervalInMilliseconds().intValue());
}
Expand All @@ -68,9 +65,10 @@ protected RouteBuilder createRouteBuilder() throws Exception {
@Override
public void configure() throws Exception {
from("direct:start")
.hystrix()
.circuitBreaker()
.hystrixConfiguration().groupKey("myCamelApp").requestLogEnabled(false).corePoolSize(5)
.maximumSize(15).allowMaximumSizeToDivergeFromCoreSize(true).end()
.maximumSize(15).allowMaximumSizeToDivergeFromCoreSize(true)
.end()
.to("direct:foo")
.onFallback()
.transform().constant("Fallback message")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.camel.component.hystrix.processor;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.CircuitBreakerDefinition;
import org.apache.camel.model.HystrixConfigurationDefinition;
import org.apache.camel.model.HystrixDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
Expand All @@ -41,11 +41,8 @@ public void testGroupKeyAndThreadPoolKeyConfigFlagsDoNotScrapHystrixConfiguratio
@Override
public void configure() throws Exception {
from("direct:foo")
.hystrix()
.hystrixConfiguration().groupKey("test1").metricsHealthSnapshotIntervalInMilliseconds(99999).end()
.groupKey("test2")
// ^^^ should only override the groupKey from the HystrixConfigurationDefinition;
// it should not discard the full HystrixConfigurationDefinition.
.circuitBreaker()
.hystrixConfiguration().groupKey("test2").metricsHealthSnapshotIntervalInMilliseconds(99999).end()
.to("log:hello")
.end();

Expand All @@ -55,9 +52,9 @@ public void configure() throws Exception {
rb.configure();

RouteDefinition route = rb.getRouteCollection().getRoutes().get(0);
assertEquals(HystrixDefinition.class, route.getOutputs().get(0).getClass());
assertEquals(CircuitBreakerDefinition.class, route.getOutputs().get(0).getClass());

HystrixConfigurationDefinition config = ((HystrixDefinition) route.getOutputs().get(0)).getHystrixConfiguration();
HystrixConfigurationDefinition config = ((CircuitBreakerDefinition) route.getOutputs().get(0)).getHystrixConfiguration();
assertEquals("test2", config.getGroupKey());
assertEquals(99999, config.getMetricsHealthSnapshotIntervalInMilliseconds().intValue());
}
Expand All @@ -69,7 +66,7 @@ protected RouteBuilder createRouteBuilder() throws Exception {
@Override
public void configure() throws Exception {
from("direct:start")
.hystrix()
.circuitBreaker()
.hystrixConfiguration().groupKey("myCamelApp").requestLogEnabled(false).corePoolSize(5).end()
.to("direct:foo")
.onFallback()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected RouteBuilder createRouteBuilder() throws Exception {
public void configure() throws Exception {
from("direct:start")
.to("log:start")
.hystrix()
.circuitBreaker()
.throwException(new IllegalArgumentException("Forced"))
.onFallback()
.transform().constant("Fallback message")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected RouteBuilder createRouteBuilder() throws Exception {
public void configure() throws Exception {
from("direct:start")
.to("log:start")
.hystrix()
.circuitBreaker()
.throwException(new IllegalArgumentException("Forced"))
.onFallbackViaNetwork()
.transform().constant("Fallback message")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ protected RouteBuilder createRouteBuilder() throws Exception {
@Override
public void configure() throws Exception {
from("direct:start")
.hystrix()
.circuitBreaker()
.to("direct:foo")
.to("log:foo")
.onFallback()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected RoutesBuilder createRouteBuilder() throws Exception {
@Override
public void configure() throws Exception {
from("direct:start")
.hystrix()
.circuitBreaker()
// use 2 second timeout
.hystrixConfiguration().executionTimeoutInMilliseconds(2000).end()
.log("Hystrix processing start: ${threadName}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected RoutesBuilder createRouteBuilder() throws Exception {
@Override
public void configure() throws Exception {
from("direct:start")
.hystrix()
.circuitBreaker()
// use 2 second timeout
.hystrixConfiguration().executionTimeoutInMilliseconds(2000).end()
.log("Hystrix processing start: ${threadName}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.camel.component.hystrix.processor;

import org.apache.camel.model.CircuitBreakerDefinition;
import org.apache.camel.model.HystrixConfigurationDefinition;
import org.apache.camel.model.HystrixDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.test.spring.CamelSpringTestSupport;
import org.junit.Assert;
Expand All @@ -36,7 +36,7 @@ protected AbstractApplicationContext createApplicationContext() {
@Test
public void testHystrix() throws Exception {
RouteDefinition routeDefinition = context.getRouteDefinition("hystrix-route");
HystrixDefinition hystrixDefinition = findHystrixDefinition(routeDefinition);
CircuitBreakerDefinition hystrixDefinition = findCircuitBreakerDefinition(routeDefinition);

Assert.assertNotNull(hystrixDefinition);

Expand All @@ -45,7 +45,7 @@ public void testHystrix() throws Exception {

Assert.assertEquals("local-conf-group-key", config.getGroupKey());
Assert.assertEquals("global-thread-key", config.getThreadPoolKey());
Assert.assertEquals(new Integer(5), config.getCorePoolSize());
Assert.assertEquals(Integer.valueOf(5), config.getCorePoolSize());

getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");

Expand All @@ -58,11 +58,11 @@ public void testHystrix() throws Exception {
// Helper
// **********************************************

private HystrixDefinition findHystrixDefinition(RouteDefinition routeDefinition) throws Exception {
private CircuitBreakerDefinition findCircuitBreakerDefinition(RouteDefinition routeDefinition) throws Exception {
return routeDefinition.getOutputs().stream()
.filter(HystrixDefinition.class::isInstance)
.map(HystrixDefinition.class::cast)
.filter(CircuitBreakerDefinition.class::isInstance)
.map(CircuitBreakerDefinition.class::cast)
.findFirst()
.orElseThrow(() -> new IllegalStateException("Unable to find a HystrixDefinition"));
.orElseThrow(() -> new IllegalStateException("Unable to find a CircuitBreakerDefinition"));
}
}
Loading

0 comments on commit 9481876

Please sign in to comment.