CAMEL-9879: Circuit Breaker EIP - That is using hystrix. Camel hystrix example.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7b5ccfff Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7b5ccfff Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7b5ccfff Branch: refs/heads/master Commit: 7b5ccfffccd58053b122f30155b74014a6fce449 Parents: c1649ce Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Apr 20 13:40:36 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Apr 21 11:53:24 2016 +0200 ---------------------------------------------------------------------- .../apache/camel/model/HystrixDefinition.java | 20 ++++- .../camel/model/OnFallbackDefinition.java | 37 ++++++++- .../hystrix/processor/HystrixProcessor.java | 80 +++++++++++--------- .../processor/HystrixProcessorCommand.java | 71 +++++++++-------- ...strixProcessorCommandFallbackViaNetwork.java | 68 +++++++++++++++++ .../processor/HystrixProcessorFactory.java | 49 +++++++++--- .../processor/HystrixRouteFallbackTest.java | 2 + .../HystrixRouteFallbackViaNetworkTest.java | 52 +++++++++++++ .../hystrix/processor/HystrixRouteOkTest.java | 9 ++- examples/camel-example-hystrix/client/pom.xml | 63 ++++++--------- .../java/sample/camel/ClientApplication.java | 45 +++++++++++ .../src/main/java/sample/camel/ClientRoute.java | 15 ++-- .../src/main/java/sample/camel/CounterBean.java | 6 +- .../src/main/resources/application.properties | 18 +++++ .../main/java/sample/camel/Service1Route.java | 1 - .../main/java/sample/camel/Service2Route.java | 1 - 16 files changed, 399 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java b/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java index adc656c..70d29b5 100644 --- a/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java @@ -188,7 +188,10 @@ public class HystrixDefinition extends ProcessorDefinition<HystrixDefinition> { } /** - * The Hystrix fallback route path to execute. + * The Hystrix fallback route path to execute that does <b>not</b> go over the network. + * <p> + * This should be a static or cached result that can immediately be returned upon failure. + * If the fallback requires network connection then use {@link #onFallbackViaNetwork()}. */ public HystrixDefinition onFallback() { onFallback = new OnFallbackDefinition(); @@ -196,4 +199,19 @@ public class HystrixDefinition extends ProcessorDefinition<HystrixDefinition> { return this; } + /** + * The Hystrix fallback route path to execute that will go over the network. + * <p/> + * If the fallback will go over the network it is another possible point of failure and so it also needs to be + * wrapped by a HystrixCommand. It is important to execute the fallback command on a separate thread-pool, + * otherwise if the main command were to become latent and fill the thread-pool + * this would prevent the fallback from running if the two commands share the same pool. + */ + public HystrixDefinition onFallbackViaNetwork() { + onFallback = new OnFallbackDefinition(); + onFallback.setFallbackViaNetwork(true); + onFallback.setParent(this); + return this; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/camel-core/src/main/java/org/apache/camel/model/OnFallbackDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/OnFallbackDefinition.java b/camel-core/src/main/java/org/apache/camel/model/OnFallbackDefinition.java index de6944a..3da7948 100644 --- a/camel-core/src/main/java/org/apache/camel/model/OnFallbackDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/OnFallbackDefinition.java @@ -19,6 +19,7 @@ package org.apache.camel.model; import java.util.List; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlRootElement; import org.apache.camel.Processor; @@ -34,12 +35,20 @@ import org.apache.camel.util.CollectionStringBuffer; @XmlAccessorType(XmlAccessType.FIELD) public class OnFallbackDefinition extends OutputDefinition<OnFallbackDefinition> { + @XmlAttribute + @Metadata(label = "command", defaultValue = "false") + private Boolean fallbackViaNetwork; + public OnFallbackDefinition() { } @Override public String toString() { - return "OnFallback[" + getOutputs() + "]"; + if (fallbackViaNetwork != null && fallbackViaNetwork) { + return "OnFallbackViaNetwork[" + getOutputs() + "]"; + } else { + return "OnFallback[" + getOutputs() + "]"; + } } @Override @@ -49,7 +58,9 @@ public class OnFallbackDefinition extends OutputDefinition<OnFallbackDefinition> @Override public String getLabel() { - CollectionStringBuffer buffer = new CollectionStringBuffer("onFallback["); + String name = fallbackViaNetwork != null && fallbackViaNetwork ? "onFallbackViaNetwork" : "onFallback"; + CollectionStringBuffer buffer = new CollectionStringBuffer(name); + buffer.append("["); List<ProcessorDefinition<?>> list = getOutputs(); for (ProcessorDefinition<?> type : list) { buffer.append(type.getLabel()); @@ -57,4 +68,26 @@ public class OnFallbackDefinition extends OutputDefinition<OnFallbackDefinition> buffer.append("]"); return buffer.toString(); } + + public Boolean getFallbackViaNetwork() { + return fallbackViaNetwork; + } + + /** + * Whether the fallback goes over the network. + * <p/> + * If the fallback will go over the network it is another possible point of failure and so it also needs to be + * wrapped by a HystrixCommand. It is important to execute the fallback command on a separate thread-pool, + * otherwise if the main command were to become latent and fill the thread-pool + * this would prevent the fallback from running if the two commands share the same pool. + */ + public void setFallbackViaNetwork(Boolean fallbackViaNetwork) { + this.fallbackViaNetwork = fallbackViaNetwork; + } + + public boolean isFallbackViaNetwork() { + // is default false + return fallbackViaNetwork != null && fallbackViaNetwork; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java index b0abbe7..15733e7 100644 --- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java @@ -26,15 +26,15 @@ import com.netflix.hystrix.HystrixCommandMetrics; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; -import org.apache.camel.Message; import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; -import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation of the Hystrix EIP. @@ -42,20 +42,28 @@ import org.apache.camel.util.AsyncProcessorHelper; @ManagedResource(description = "Managed Hystrix Processor") public class HystrixProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, org.apache.camel.Traceable, IdAware { + private static final Logger LOG = LoggerFactory.getLogger(HystrixProcessor.class); private String id; - private final HystrixCommandKey commandKey; private final HystrixCommandGroupKey groupKey; - private final HystrixCommand.Setter setter; - private final AsyncProcessor processor; - private final AsyncProcessor fallback; - - public HystrixProcessor(HystrixCommandKey commandKey, HystrixCommandGroupKey groupKey, HystrixCommand.Setter setter, - Processor processor, Processor fallback) { - this.commandKey = commandKey; + private final HystrixCommandKey commandKey; + private final HystrixCommandKey fallbackCommandKey; + private final com.netflix.hystrix.HystrixCommand.Setter setter; + private final com.netflix.hystrix.HystrixCommand.Setter fallbackSetter; + private final Processor processor; + private final Processor fallback; + private final boolean fallbackViaNetwork; + + public HystrixProcessor(HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, HystrixCommandKey fallbackCommandKey, + HystrixCommand.Setter setter, HystrixCommand.Setter fallbackSetter, + Processor processor, Processor fallback, boolean fallbackViaNetwork) { this.groupKey = groupKey; + this.commandKey = commandKey; + this.fallbackCommandKey = fallbackCommandKey; this.setter = setter; - this.processor = AsyncProcessorConverterHelper.convert(processor); - this.fallback = AsyncProcessorConverterHelper.convert(fallback); + this.fallbackSetter = fallbackSetter; + this.processor = processor; + this.fallback = fallback; + this.fallbackViaNetwork = fallbackViaNetwork; } @ManagedAttribute @@ -64,11 +72,25 @@ public class HystrixProcessor extends ServiceSupport implements AsyncProcessor, } @ManagedAttribute + public String getHystrixFallbackCommandKey() { + if (fallbackCommandKey != null) { + return fallbackCommandKey.name(); + } else { + return null; + } + } + + @ManagedAttribute public String getHystrixGroupKey() { return groupKey.name(); } @ManagedAttribute + public boolean isFallbackViaNetwork() { + return isFallbackViaNetwork(); + } + + @ManagedAttribute public int getHystrixTotalTimeMean() { HystrixCommandMetrics metrics = HystrixCommandMetrics.getInstance(commandKey); if (metrics != null) { @@ -161,38 +183,24 @@ public class HystrixProcessor extends ServiceSupport implements AsyncProcessor, } @Override - public boolean process(final Exchange exchange, final AsyncCallback callback) { + public boolean process(Exchange exchange, AsyncCallback callback) { // run this as if we run inside try .. catch so there is no regular Camel error handler exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true); try { - // create command - HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, callback, processor, fallback); - - // execute the command asynchronous and observe when its done - command.observe().subscribe((msg) -> { - if (command.isResponseFromCache()) { - // its from cache so need to copy it into the exchange - Message target = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); - target.copyFrom(msg); - } else { - // if it was not from cache then run/fallback was executed and the result - // is already set correctly on the exchange and we do not need to do anything - } - }, throwable -> { - exchange.setException(throwable); - }, () -> { - exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK); - callback.done(false); - }); + HystrixProcessorCommandFallbackViaNetwork fallbackCommand = null; + if (fallbackViaNetwork) { + fallbackCommand = new HystrixProcessorCommandFallbackViaNetwork(fallbackSetter, exchange, fallback); + } + HystrixProcessorCommand command = new HystrixProcessorCommand(setter, exchange, processor, fallback, fallbackCommand); + command.execute(); } catch (Throwable e) { - // error adding to queue, so set as error and we are done exchange.setException(e); - callback.done(true); - return true; } - return false; + exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK); + callback.done(true); + return true; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java index b80a53a..98c87ff 100644 --- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,30 +17,30 @@ package org.apache.camel.component.hystrix.processor; import com.netflix.hystrix.HystrixCommand; -import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Message; +import org.apache.camel.Processor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Hystrix Command for the Camel Hystrix EIP. */ -public class HystrixProcessorCommand extends HystrixCommand<Message> { +public class HystrixProcessorCommand extends HystrixCommand { private static final Logger LOG = LoggerFactory.getLogger(HystrixProcessorCommand.class); private final Exchange exchange; - private final AsyncCallback callback; - private final AsyncProcessor processor; - private final AsyncProcessor fallback; + private final Processor processor; + private final Processor fallback; + private final HystrixProcessorCommandFallbackViaNetwork fallbackCommand; - public HystrixProcessorCommand(Setter setter, Exchange exchange, AsyncCallback callback, AsyncProcessor processor, AsyncProcessor fallback) { + public HystrixProcessorCommand(Setter setter, Exchange exchange, Processor processor, Processor fallback, + HystrixProcessorCommandFallbackViaNetwork fallbackCommand) { super(setter); this.exchange = exchange; - this.callback = callback; this.processor = processor; this.fallback = fallback; + this.fallbackCommand = fallbackCommand; } @Override @@ -51,30 +51,33 @@ public class HystrixProcessorCommand extends HystrixCommand<Message> { return exchange.hasOut() ? exchange.getOut() : exchange.getIn(); } - try { - if (fallback != null) { - LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage()); - // store the last to endpoint as the failure endpoint - if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) { - exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); - } - // give the rest of the pipeline another chance - exchange.setProperty(Exchange.EXCEPTION_HANDLED, true); - exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception); - exchange.setException(null); - // and we should not be regarded as exhausted as we are in a try .. catch block - exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); - // run the fallback processor - try { + if (fallback != null || fallbackCommand != null) { + LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage()); + // store the last to endpoint as the failure endpoint + if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) { + exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); + } + // give the rest of the pipeline another chance + exchange.setProperty(Exchange.EXCEPTION_HANDLED, true); + exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception); + exchange.setException(null); + // and we should not be regarded as exhausted as we are in a try .. catch block + exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); + // run the fallback processor + try { + // use fallback command if provided (fallback via network) + if (fallbackCommand != null) { + return fallbackCommand.execute(); + } else { LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange); - fallback.process(exchange, callback); - } catch (Exception e) { - exchange.setException(e); + // process the fallback until its fully done + // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method) + fallback.process(exchange); + LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange); } + } catch (Exception e) { + exchange.setException(e); } - } finally { - LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange); - exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK); } return exchange.hasOut() ? exchange.getOut() : exchange.getIn(); @@ -85,7 +88,9 @@ public class HystrixProcessorCommand extends HystrixCommand<Message> { LOG.debug("Running processor: {} with exchange: {}", processor, exchange); try { - processor.process(exchange, callback); + // process the processor until its fully done + // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method) + processor.process(exchange); } catch (Exception e) { exchange.setException(e); } http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommandFallbackViaNetwork.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommandFallbackViaNetwork.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommandFallbackViaNetwork.java new file mode 100644 index 0000000..c565f9b --- /dev/null +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommandFallbackViaNetwork.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix.processor; + +import com.netflix.hystrix.HystrixCommand; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Hystrix Command the Camel Hystrix EIP when executing fallback. + * The fallback may require networking and therefore should run in another Hystrix Command + */ +public class HystrixProcessorCommandFallbackViaNetwork extends HystrixCommand<Message> { + + private static final Logger LOG = LoggerFactory.getLogger(HystrixProcessorCommandFallbackViaNetwork.class); + private final Exchange exchange; + private final Processor processor; + + public HystrixProcessorCommandFallbackViaNetwork(Setter setter, Exchange exchange, Processor processor) { + super(setter); + this.exchange = exchange; + this.processor = processor; + } + + @Override + protected Message getFallback() { + return null; + } + + @Override + protected Message run() throws Exception { + LOG.debug("Running fallback processor: {} with exchange: {}", processor, exchange); + + try { + // process the processor until its fully done + // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method) + processor.process(exchange); + } catch (Throwable e) { + exchange.setException(e); + } + + // if we failed then throw an exception to signal that the fallback failed as well + if (exchange.getException() != null) { + throw exchange.getException(); + } + + LOG.debug("Running fallback processor: {} with exchange: {} done", processor, exchange); + // no fallback then we are done + return exchange.hasOut() ? exchange.getOut() : exchange.getIn(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java index 180fe9e..9dbf970 100644 --- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java @@ -59,6 +59,8 @@ public class HystrixProcessorFactory implements ProcessorFactory { configRef = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), cb.getHystrixConfigurationRef(), HystrixConfigurationDefinition.class); } + String id = cb.idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); + // group and thread pool keys to use they can be configured on configRef and config, so look there first, and if none then use default String groupKey = null; String threadPoolKey = null; @@ -74,36 +76,61 @@ public class HystrixProcessorFactory implements ProcessorFactory { groupKey = HystrixConfigurationDefinition.DEFAULT_GROUP_KEY; } if (threadPoolKey == null) { - // thread pool key should use same as group key as default - threadPoolKey = groupKey; + threadPoolKey = id + "-threadpool"; } // use the node id as the command key - String id = cb.idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); HystrixCommandKey hcCommandKey = HystrixCommandKey.Factory.asKey(id); + HystrixCommandKey hcFallbackCommandKey = HystrixCommandKey.Factory.asKey(id + "-fallback"); // use the configured group key HystrixCommandGroupKey hcGroupKey = HystrixCommandGroupKey.Factory.asKey(groupKey); + HystrixThreadPoolKey tpKey = HystrixThreadPoolKey.Factory.asKey(threadPoolKey); // create setter using the default options HystrixCommand.Setter setter = HystrixCommand.Setter .withGroupKey(hcGroupKey) .andCommandKey(hcCommandKey) - .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolKey)); - HystrixCommandProperties.Setter command = HystrixCommandProperties.Setter(); - setter.andCommandPropertiesDefaults(command); - HystrixThreadPoolProperties.Setter threadPool = HystrixThreadPoolProperties.Setter(); - setter.andThreadPoolPropertiesDefaults(threadPool); + .andThreadPoolKey(tpKey); + HystrixCommandProperties.Setter commandSetter = HystrixCommandProperties.Setter(); + setter.andCommandPropertiesDefaults(commandSetter); + HystrixThreadPoolProperties.Setter threadPoolSetter = HystrixThreadPoolProperties.Setter(); + setter.andThreadPoolPropertiesDefaults(threadPoolSetter); // at first configure any shared options if (configRef != null) { - configureHystrix(command, threadPool, configRef); + configureHystrix(commandSetter, threadPoolSetter, configRef); } // then any local configured can override if (config != null) { - configureHystrix(command, threadPool, config); + configureHystrix(commandSetter, threadPoolSetter, config); + } + + // create setter for fallback via network + HystrixCommand.Setter fallbackSetter = null; + boolean fallbackViaNetwork = cb.getOnFallback() != null && cb.getOnFallback().isFallbackViaNetwork(); + if (fallbackViaNetwork) { + HystrixThreadPoolKey tpFallbackKey = HystrixThreadPoolKey.Factory.asKey(threadPoolKey + "-fallback"); + + fallbackSetter = HystrixCommand.Setter + .withGroupKey(hcGroupKey) + .andCommandKey(hcFallbackCommandKey) + .andThreadPoolKey(tpFallbackKey); + HystrixCommandProperties.Setter commandFallbackSetter = HystrixCommandProperties.Setter(); + fallbackSetter.andCommandPropertiesDefaults(commandFallbackSetter); + HystrixThreadPoolProperties.Setter fallbackThreadPoolSetter = HystrixThreadPoolProperties.Setter(); + fallbackSetter.andThreadPoolPropertiesDefaults(fallbackThreadPoolSetter); + + // at first configure any shared options + if (configRef != null) { + configureHystrix(commandFallbackSetter, fallbackThreadPoolSetter, configRef); + } + // then any local configured can override + if (config != null) { + configureHystrix(commandFallbackSetter, fallbackThreadPoolSetter, config); + } } - return new HystrixProcessor(hcCommandKey, hcGroupKey, setter, processor, fallback); + return new HystrixProcessor(hcGroupKey, hcCommandKey, hcFallbackCommandKey, setter, fallbackSetter, processor, fallback, fallbackViaNetwork); } else { return null; } http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackTest.java index 8d70ca8..63f194d 100644 --- a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackTest.java +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackTest.java @@ -37,11 +37,13 @@ public class HystrixRouteFallbackTest extends CamelTestSupport { @Override public void configure() throws Exception { from("direct:start") + .to("log:start") .hystrix() .throwException(new IllegalArgumentException("Forced")) .onFallback() .transform().constant("Fallback message") .end() + .to("log:result") .to("mock:result"); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackViaNetworkTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackViaNetworkTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackViaNetworkTest.java new file mode 100644 index 0000000..caf7b69 --- /dev/null +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackViaNetworkTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix.processor; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class HystrixRouteFallbackViaNetworkTest extends CamelTestSupport { + + @Test + public void testHystrix() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Fallback message"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .to("log:start") + .hystrix() + .throwException(new IllegalArgumentException("Forced")) + .onFallbackViaNetwork() + .transform().constant("Fallback message") + .end() + .to("log:result") + .to("mock:result"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteOkTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteOkTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteOkTest.java index 4980bb4..92faf79 100644 --- a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteOkTest.java +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteOkTest.java @@ -24,9 +24,9 @@ public class HystrixRouteOkTest extends CamelTestSupport { @Test public void testHystrix() throws Exception { - getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:result").expectedMinimumMessageCount(2); - template.sendBody("direct:start", "Hello World"); +// template.sendBody("direct:start", "Hello World"); assertMockEndpointsSatisfied(); } @@ -36,12 +36,15 @@ public class HystrixRouteOkTest extends CamelTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start") + from("timer:trigger") + .to("log:trigger") .hystrix() .to("direct:foo") + .to("log:foo") .onFallback() .transform().constant("Fallback message") .end() + .to("log:result") .to("mock:result"); from("direct:foo") http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/client/pom.xml ---------------------------------------------------------------------- diff --git a/examples/camel-example-hystrix/client/pom.xml b/examples/camel-example-hystrix/client/pom.xml index 2723c27..ed00524 100644 --- a/examples/camel-example-hystrix/client/pom.xml +++ b/examples/camel-example-hystrix/client/pom.xml @@ -30,10 +30,21 @@ <name>Camel :: Example :: Hystrix :: Client</name> <description>An example showing how to use Hystrix EIP as circuit breaker in Camel routes</description> - <!-- import Camel BOM --> + <properties> + <spring.boot-version>${spring-boot-version}</spring.boot-version> + </properties> + + <!-- import Spring-Boot and Camel BOM --> <dependencyManagement> <dependencies> <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-dependencies</artifactId> + <version>${spring.boot-version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-parent</artifactId> <version>${project.version}</version> @@ -45,66 +56,36 @@ <dependencies> - <!-- CDI API --> + <!-- spring-boot --> <dependency> - <groupId>javax.enterprise</groupId> - <artifactId>cdi-api</artifactId> - <version>${cdi-api-1.2-version}</version> - <scope>provided</scope> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-web</artifactId> </dependency> - <!-- camel-cdi --> + <!-- camel --> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-cdi</artifactId> + <artifactId>camel-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-hystrix</artifactId> + <artifactId>camel-http</artifactId> </dependency> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-http</artifactId> + <artifactId>camel-hystrix</artifactId> </dependency> - <!-- logging --> - - </dependencies> <build> - <plugins> - <!-- allows the routes to be run via 'mvn camel:run' --> <plugin> - <groupId>org.apache.camel</groupId> - <artifactId>camel-maven-plugin</artifactId> - <version>${project.version}</version> - <dependencies> - <dependency> - <groupId>org.apache.deltaspike.cdictrl</groupId> - <artifactId>deltaspike-cdictrl-weld</artifactId> - <version>${deltaspike-version}</version> - </dependency> - <dependency> - <groupId>org.jboss.weld.se</groupId> - <artifactId>weld-se</artifactId> - <version>${weld2-version}</version> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>${slf4j-version}</version> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <version>${log4j-version}</version> - </dependency> - </dependencies> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-maven-plugin</artifactId> + <version>${spring-boot-version}</version> </plugin> </plugins> - </build> </project> http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientApplication.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientApplication.java b/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientApplication.java new file mode 100644 index 0000000..f5c3da8 --- /dev/null +++ b/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientApplication.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sample.camel; + +import org.apache.camel.component.hystrix.metrics.servlet.HystrixEventStreamServlet; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.embedded.ServletRegistrationBean; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication +public class ClientApplication { + + /** + * A main method to start this application. + */ + public static void main(String[] args) { + SpringApplication.run(ClientApplication.class, args); + } + + @Bean + public HystrixEventStreamServlet hystrixServlet() { + return new HystrixEventStreamServlet(); + } + + @Bean + public ServletRegistrationBean servletRegistrationBean() { + return new ServletRegistrationBean(new HystrixEventStreamServlet(),"/hystrix.stream"); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientRoute.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientRoute.java b/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientRoute.java index be174a6..36a006f 100644 --- a/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientRoute.java +++ b/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientRoute.java @@ -17,20 +17,25 @@ package sample.camel; import org.apache.camel.builder.RouteBuilder; +import org.springframework.stereotype.Component; +@Component public class ClientRoute extends RouteBuilder { @Override public void configure() { // you can configure the route rule with Java DSL here - from("timer:trigger?exchangePattern=InOut&period=10s") + from("timer:trigger?period=1s").streamCaching() .bean("counterBean") + .log(" Client request: ${body}") .hystrix() - .log(" Client request: ${body}") .to("http://localhost:9090/service1") - .onFallback() - .log(" Client fallback request: ${body}") - .to("http://localhost:9090/service2") + //.onFallback() + // we use a fallback without network that provides a repsonse message immediately + // .transform().simple("Fallback ${body}") + .onFallbackViaNetwork() + // we use fallback via network where we call a 2nd service + .to("http://localhost:7070/service2") .end() .log("Client response: ${body}"); } http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/client/src/main/java/sample/camel/CounterBean.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-hystrix/client/src/main/java/sample/camel/CounterBean.java b/examples/camel-example-hystrix/client/src/main/java/sample/camel/CounterBean.java index 0c27215..3df5fc5 100644 --- a/examples/camel-example-hystrix/client/src/main/java/sample/camel/CounterBean.java +++ b/examples/camel-example-hystrix/client/src/main/java/sample/camel/CounterBean.java @@ -16,11 +16,9 @@ */ package sample.camel; -import javax.inject.Named; -import javax.inject.Singleton; +import org.springframework.stereotype.Component; -@Singleton -@Named("counterBean") +@Component("counterBean") public class CounterBean { private int counter; http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/client/src/main/resources/application.properties ---------------------------------------------------------------------- diff --git a/examples/camel-example-hystrix/client/src/main/resources/application.properties b/examples/camel-example-hystrix/client/src/main/resources/application.properties new file mode 100644 index 0000000..87475ba --- /dev/null +++ b/examples/camel-example-hystrix/client/src/main/resources/application.properties @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +server.port=8080 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/service1/src/main/java/sample/camel/Service1Route.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-hystrix/service1/src/main/java/sample/camel/Service1Route.java b/examples/camel-example-hystrix/service1/src/main/java/sample/camel/Service1Route.java index e785560..17242a7 100644 --- a/examples/camel-example-hystrix/service1/src/main/java/sample/camel/Service1Route.java +++ b/examples/camel-example-hystrix/service1/src/main/java/sample/camel/Service1Route.java @@ -26,7 +26,6 @@ public class Service1Route extends RouteBuilder { public void configure() throws Exception { from("jetty:http://0.0.0.0:{{service1.port}}/service1").routeId("service1").streamCaching() .log("Service1 request: ${body}") - .delay(simple("${random(1000,2000)}")) .transform(simple("Service1-${body}")) .log("Service1 response: ${body}"); } http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/service2/src/main/java/sample/camel/Service2Route.java ---------------------------------------------------------------------- diff --git a/examples/camel-example-hystrix/service2/src/main/java/sample/camel/Service2Route.java b/examples/camel-example-hystrix/service2/src/main/java/sample/camel/Service2Route.java index 9510461..9bb2cbb 100644 --- a/examples/camel-example-hystrix/service2/src/main/java/sample/camel/Service2Route.java +++ b/examples/camel-example-hystrix/service2/src/main/java/sample/camel/Service2Route.java @@ -24,7 +24,6 @@ public class Service2Route extends RouteBuilder { public void configure() throws Exception { from("undertow:http://0.0.0.0:7070/service2").routeId("service2").streamCaching() .log(" Service2 request: ${body}") - .delay(simple("${random(1000,2000)}")) .transform(simple("Service2-${body}")) .log("Service2 response: ${body}"); }