CAMEL-9879: Circuit Breaker EIP - That is using hystrix. Camel hystrix example.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7b5ccfff
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7b5ccfff
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7b5ccfff

Branch: refs/heads/master
Commit: 7b5ccfffccd58053b122f30155b74014a6fce449
Parents: c1649ce
Author: Claus Ibsen <davscl...@apache.org>
Authored: Wed Apr 20 13:40:36 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Apr 21 11:53:24 2016 +0200

----------------------------------------------------------------------
 .../apache/camel/model/HystrixDefinition.java   | 20 ++++-
 .../camel/model/OnFallbackDefinition.java       | 37 ++++++++-
 .../hystrix/processor/HystrixProcessor.java     | 80 +++++++++++---------
 .../processor/HystrixProcessorCommand.java      | 71 +++++++++--------
 ...strixProcessorCommandFallbackViaNetwork.java | 68 +++++++++++++++++
 .../processor/HystrixProcessorFactory.java      | 49 +++++++++---
 .../processor/HystrixRouteFallbackTest.java     |  2 +
 .../HystrixRouteFallbackViaNetworkTest.java     | 52 +++++++++++++
 .../hystrix/processor/HystrixRouteOkTest.java   |  9 ++-
 examples/camel-example-hystrix/client/pom.xml   | 63 ++++++---------
 .../java/sample/camel/ClientApplication.java    | 45 +++++++++++
 .../src/main/java/sample/camel/ClientRoute.java | 15 ++--
 .../src/main/java/sample/camel/CounterBean.java |  6 +-
 .../src/main/resources/application.properties   | 18 +++++
 .../main/java/sample/camel/Service1Route.java   |  1 -
 .../main/java/sample/camel/Service2Route.java   |  1 -
 16 files changed, 399 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
index adc656c..70d29b5 100644
--- a/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/HystrixDefinition.java
@@ -188,7 +188,10 @@ public class HystrixDefinition extends 
ProcessorDefinition<HystrixDefinition> {
     }
 
     /**
-     * The Hystrix fallback route path to execute.
+     * The Hystrix fallback route path to execute that does <b>not</b> go over 
the network.
+     * <p>
+     * This should be a static or cached result that can immediately be 
returned upon failure.
+     * If the fallback requires network connection then use {@link 
#onFallbackViaNetwork()}.
      */
     public HystrixDefinition onFallback() {
         onFallback = new OnFallbackDefinition();
@@ -196,4 +199,19 @@ public class HystrixDefinition extends 
ProcessorDefinition<HystrixDefinition> {
         return this;
     }
 
+    /**
+     * The Hystrix fallback route path to execute that will go over the 
network.
+     * <p/>
+     * If the fallback will go over the network it is another possible point 
of failure and so it also needs to be
+     * wrapped by a HystrixCommand. It is important to execute the fallback 
command on a separate thread-pool,
+     * otherwise if the main command were to become latent and fill the 
thread-pool
+     * this would prevent the fallback from running if the two commands share 
the same pool.
+     */
+    public HystrixDefinition onFallbackViaNetwork() {
+        onFallback = new OnFallbackDefinition();
+        onFallback.setFallbackViaNetwork(true);
+        onFallback.setParent(this);
+        return this;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/camel-core/src/main/java/org/apache/camel/model/OnFallbackDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/OnFallbackDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/OnFallbackDefinition.java
index de6944a..3da7948 100644
--- a/camel-core/src/main/java/org/apache/camel/model/OnFallbackDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/OnFallbackDefinition.java
@@ -19,6 +19,7 @@ package org.apache.camel.model;
 import java.util.List;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
 
 import org.apache.camel.Processor;
@@ -34,12 +35,20 @@ import org.apache.camel.util.CollectionStringBuffer;
 @XmlAccessorType(XmlAccessType.FIELD)
 public class OnFallbackDefinition extends 
OutputDefinition<OnFallbackDefinition> {
 
+    @XmlAttribute
+    @Metadata(label = "command", defaultValue = "false")
+    private Boolean fallbackViaNetwork;
+
     public OnFallbackDefinition() {
     }
 
     @Override
     public String toString() {
-        return "OnFallback[" + getOutputs() + "]";
+        if (fallbackViaNetwork != null && fallbackViaNetwork) {
+            return "OnFallbackViaNetwork[" + getOutputs() + "]";
+        } else {
+            return "OnFallback[" + getOutputs() + "]";
+        }
     }
 
     @Override
@@ -49,7 +58,9 @@ public class OnFallbackDefinition extends 
OutputDefinition<OnFallbackDefinition>
 
     @Override
     public String getLabel() {
-        CollectionStringBuffer buffer = new 
CollectionStringBuffer("onFallback[");
+        String name = fallbackViaNetwork != null && fallbackViaNetwork ? 
"onFallbackViaNetwork" : "onFallback";
+        CollectionStringBuffer buffer = new CollectionStringBuffer(name);
+        buffer.append("[");
         List<ProcessorDefinition<?>> list = getOutputs();
         for (ProcessorDefinition<?> type : list) {
             buffer.append(type.getLabel());
@@ -57,4 +68,26 @@ public class OnFallbackDefinition extends 
OutputDefinition<OnFallbackDefinition>
         buffer.append("]");
         return buffer.toString();
     }
+
+    public Boolean getFallbackViaNetwork() {
+        return fallbackViaNetwork;
+    }
+
+    /**
+     * Whether the fallback goes over the network.
+     * <p/>
+     * If the fallback will go over the network it is another possible point 
of failure and so it also needs to be
+     * wrapped by a HystrixCommand. It is important to execute the fallback 
command on a separate thread-pool,
+     * otherwise if the main command were to become latent and fill the 
thread-pool
+     * this would prevent the fallback from running if the two commands share 
the same pool.
+     */
+    public void setFallbackViaNetwork(Boolean fallbackViaNetwork) {
+        this.fallbackViaNetwork = fallbackViaNetwork;
+    }
+
+    public boolean isFallbackViaNetwork() {
+        // is default false
+        return fallbackViaNetwork != null && fallbackViaNetwork;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
 
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
index b0abbe7..15733e7 100644
--- 
a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
+++ 
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessor.java
@@ -26,15 +26,15 @@ import com.netflix.hystrix.HystrixCommandMetrics;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of the Hystrix EIP.
@@ -42,20 +42,28 @@ import org.apache.camel.util.AsyncProcessorHelper;
 @ManagedResource(description = "Managed Hystrix Processor")
 public class HystrixProcessor extends ServiceSupport implements 
AsyncProcessor, Navigate<Processor>, org.apache.camel.Traceable, IdAware {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(HystrixProcessor.class);
     private String id;
-    private final HystrixCommandKey commandKey;
     private final HystrixCommandGroupKey groupKey;
-    private final HystrixCommand.Setter setter;
-    private final AsyncProcessor processor;
-    private final AsyncProcessor fallback;
-
-    public HystrixProcessor(HystrixCommandKey commandKey, 
HystrixCommandGroupKey groupKey, HystrixCommand.Setter setter,
-                            Processor processor, Processor fallback) {
-        this.commandKey = commandKey;
+    private final HystrixCommandKey commandKey;
+    private final HystrixCommandKey fallbackCommandKey;
+    private final com.netflix.hystrix.HystrixCommand.Setter setter;
+    private final com.netflix.hystrix.HystrixCommand.Setter fallbackSetter;
+    private final Processor processor;
+    private final Processor fallback;
+    private final boolean fallbackViaNetwork;
+
+    public HystrixProcessor(HystrixCommandGroupKey groupKey, HystrixCommandKey 
commandKey, HystrixCommandKey fallbackCommandKey,
+                            HystrixCommand.Setter setter, 
HystrixCommand.Setter fallbackSetter,
+                            Processor processor, Processor fallback, boolean 
fallbackViaNetwork) {
         this.groupKey = groupKey;
+        this.commandKey = commandKey;
+        this.fallbackCommandKey = fallbackCommandKey;
         this.setter = setter;
-        this.processor = AsyncProcessorConverterHelper.convert(processor);
-        this.fallback = AsyncProcessorConverterHelper.convert(fallback);
+        this.fallbackSetter = fallbackSetter;
+        this.processor = processor;
+        this.fallback = fallback;
+        this.fallbackViaNetwork = fallbackViaNetwork;
     }
 
     @ManagedAttribute
@@ -64,11 +72,25 @@ public class HystrixProcessor extends ServiceSupport 
implements AsyncProcessor,
     }
 
     @ManagedAttribute
+    public String getHystrixFallbackCommandKey() {
+        if (fallbackCommandKey != null) {
+            return fallbackCommandKey.name();
+        } else {
+            return null;
+        }
+    }
+
+    @ManagedAttribute
     public String getHystrixGroupKey() {
         return groupKey.name();
     }
 
     @ManagedAttribute
+    public boolean isFallbackViaNetwork() {
+        return isFallbackViaNetwork();
+    }
+
+    @ManagedAttribute
     public int getHystrixTotalTimeMean() {
         HystrixCommandMetrics metrics = 
HystrixCommandMetrics.getInstance(commandKey);
         if (metrics != null) {
@@ -161,38 +183,24 @@ public class HystrixProcessor extends ServiceSupport 
implements AsyncProcessor,
     }
 
     @Override
-    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         // run this as if we run inside try .. catch so there is no regular 
Camel error handler
         exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true);
 
         try {
-            // create command
-            HystrixProcessorCommand command = new 
HystrixProcessorCommand(setter, exchange, callback, processor, fallback);
-
-            // execute the command asynchronous and observe when its done
-            command.observe().subscribe((msg) -> {
-                    if (command.isResponseFromCache()) {
-                        // its from cache so need to copy it into the exchange
-                        Message target = exchange.hasOut() ? exchange.getOut() 
: exchange.getIn();
-                        target.copyFrom(msg);
-                    } else {
-                        // if it was not from cache then run/fallback was 
executed and the result
-                        // is already set correctly on the exchange and we do 
not need to do anything
-                    }
-                }, throwable -> {
-                    exchange.setException(throwable);
-                }, () -> {
-                    exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
-                    callback.done(false);
-                });
+            HystrixProcessorCommandFallbackViaNetwork fallbackCommand = null;
+            if (fallbackViaNetwork) {
+                fallbackCommand = new 
HystrixProcessorCommandFallbackViaNetwork(fallbackSetter, exchange, fallback);
+            }
+            HystrixProcessorCommand command = new 
HystrixProcessorCommand(setter, exchange, processor, fallback, fallbackCommand);
+            command.execute();
         } catch (Throwable e) {
-            // error adding to queue, so set as error and we are done
             exchange.setException(e);
-            callback.done(true);
-            return true;
         }
 
-        return false;
+        exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
+        callback.done(true);
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
 
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index b80a53a..98c87ff 100644
--- 
a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ 
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,30 +17,30 @@
 package org.apache.camel.component.hystrix.processor;
 
 import com.netflix.hystrix.HystrixCommand;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.Processor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Hystrix Command for the Camel Hystrix EIP.
  */
-public class HystrixProcessorCommand extends HystrixCommand<Message> {
+public class HystrixProcessorCommand extends HystrixCommand {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HystrixProcessorCommand.class);
     private final Exchange exchange;
-    private final AsyncCallback callback;
-    private final AsyncProcessor processor;
-    private final AsyncProcessor fallback;
+    private final Processor processor;
+    private final Processor fallback;
+    private final HystrixProcessorCommandFallbackViaNetwork fallbackCommand;
 
-    public HystrixProcessorCommand(Setter setter, Exchange exchange, 
AsyncCallback callback, AsyncProcessor processor, AsyncProcessor fallback) {
+    public HystrixProcessorCommand(Setter setter, Exchange exchange, Processor 
processor, Processor fallback,
+                                   HystrixProcessorCommandFallbackViaNetwork 
fallbackCommand) {
         super(setter);
         this.exchange = exchange;
-        this.callback = callback;
         this.processor = processor;
         this.fallback = fallback;
+        this.fallbackCommand = fallbackCommand;
     }
 
     @Override
@@ -51,30 +51,33 @@ public class HystrixProcessorCommand extends 
HystrixCommand<Message> {
             return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
         }
 
-        try {
-            if (fallback != null) {
-                LOG.debug("Error occurred processing. Will now run fallback. 
Exception class: {} message: {}.", exception.getClass().getName(), 
exception.getMessage());
-                // store the last to endpoint as the failure endpoint
-                if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
-                    exchange.setProperty(Exchange.FAILURE_ENDPOINT, 
exchange.getProperty(Exchange.TO_ENDPOINT));
-                }
-                // give the rest of the pipeline another chance
-                exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
-                exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
-                exchange.setException(null);
-                // and we should not be regarded as exhausted as we are in a 
try .. catch block
-                exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
-                // run the fallback processor
-                try {
+        if (fallback != null || fallbackCommand != null) {
+            LOG.debug("Error occurred processing. Will now run fallback. 
Exception class: {} message: {}.", exception.getClass().getName(), 
exception.getMessage());
+            // store the last to endpoint as the failure endpoint
+            if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
+                exchange.setProperty(Exchange.FAILURE_ENDPOINT, 
exchange.getProperty(Exchange.TO_ENDPOINT));
+            }
+            // give the rest of the pipeline another chance
+            exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
+            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
+            exchange.setException(null);
+            // and we should not be regarded as exhausted as we are in a try 
.. catch block
+            exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+            // run the fallback processor
+            try {
+                // use fallback command if provided (fallback via network)
+                if (fallbackCommand != null) {
+                    return fallbackCommand.execute();
+                } else {
                     LOG.debug("Running fallback: {} with exchange: {}", 
fallback, exchange);
-                    fallback.process(exchange, callback);
-                } catch (Exception e) {
-                    exchange.setException(e);
+                    // process the fallback until its fully done
+                    // (we do not hav any hystrix callback to leverage so we 
need to complete all work in this run method)
+                    fallback.process(exchange);
+                    LOG.debug("Running fallback: {} with exchange: {} done", 
fallback, exchange);
                 }
+            } catch (Exception e) {
+                exchange.setException(e);
             }
-        } finally {
-            LOG.debug("Running fallback: {} with exchange: {} done", fallback, 
exchange);
-            exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
         }
 
         return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
@@ -85,7 +88,9 @@ public class HystrixProcessorCommand extends 
HystrixCommand<Message> {
         LOG.debug("Running processor: {} with exchange: {}", processor, 
exchange);
 
         try {
-            processor.process(exchange, callback);
+            // process the processor until its fully done
+            // (we do not hav any hystrix callback to leverage so we need to 
complete all work in this run method)
+            processor.process(exchange);
         } catch (Exception e) {
             exchange.setException(e);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommandFallbackViaNetwork.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommandFallbackViaNetwork.java
 
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommandFallbackViaNetwork.java
new file mode 100644
index 0000000..c565f9b
--- /dev/null
+++ 
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommandFallbackViaNetwork.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hystrix.processor;
+
+import com.netflix.hystrix.HystrixCommand;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Hystrix Command the Camel Hystrix EIP when executing fallback.
+ * The fallback may require networking and therefore should run in another 
Hystrix Command
+ */
+public class HystrixProcessorCommandFallbackViaNetwork extends 
HystrixCommand<Message> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HystrixProcessorCommandFallbackViaNetwork.class);
+    private final Exchange exchange;
+    private final Processor processor;
+
+    public HystrixProcessorCommandFallbackViaNetwork(Setter setter, Exchange 
exchange, Processor processor) {
+        super(setter);
+        this.exchange = exchange;
+        this.processor = processor;
+    }
+
+    @Override
+    protected Message getFallback() {
+        return null;
+    }
+
+    @Override
+    protected Message run() throws Exception {
+        LOG.debug("Running fallback processor: {} with exchange: {}", 
processor, exchange);
+
+        try {
+            // process the processor until its fully done
+            // (we do not hav any hystrix callback to leverage so we need to 
complete all work in this run method)
+            processor.process(exchange);
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
+
+        // if we failed then throw an exception to signal that the fallback 
failed as well
+        if (exchange.getException() != null) {
+            throw exchange.getException();
+        }
+
+        LOG.debug("Running fallback processor: {} with exchange: {} done", 
processor, exchange);
+        // no fallback then we are done
+        return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
 
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
index 180fe9e..9dbf970 100644
--- 
a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
+++ 
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorFactory.java
@@ -59,6 +59,8 @@ public class HystrixProcessorFactory implements 
ProcessorFactory {
                 configRef = 
CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), 
cb.getHystrixConfigurationRef(), HystrixConfigurationDefinition.class);
             }
 
+            String id = 
cb.idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
+
             // group and thread pool keys to use they can be configured on 
configRef and config, so look there first, and if none then use default
             String groupKey = null;
             String threadPoolKey = null;
@@ -74,36 +76,61 @@ public class HystrixProcessorFactory implements 
ProcessorFactory {
                 groupKey = HystrixConfigurationDefinition.DEFAULT_GROUP_KEY;
             }
             if (threadPoolKey == null) {
-                // thread pool key should use same as group key as default
-                threadPoolKey = groupKey;
+                threadPoolKey = id + "-threadpool";
             }
 
             // use the node id as the command key
-            String id = 
cb.idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
             HystrixCommandKey hcCommandKey = 
HystrixCommandKey.Factory.asKey(id);
+            HystrixCommandKey hcFallbackCommandKey = 
HystrixCommandKey.Factory.asKey(id + "-fallback");
             // use the configured group key
             HystrixCommandGroupKey hcGroupKey = 
HystrixCommandGroupKey.Factory.asKey(groupKey);
+            HystrixThreadPoolKey tpKey = 
HystrixThreadPoolKey.Factory.asKey(threadPoolKey);
 
             // create setter using the default options
             HystrixCommand.Setter setter = HystrixCommand.Setter
                     .withGroupKey(hcGroupKey)
                     .andCommandKey(hcCommandKey)
-                    
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolKey));
-            HystrixCommandProperties.Setter command = 
HystrixCommandProperties.Setter();
-            setter.andCommandPropertiesDefaults(command);
-            HystrixThreadPoolProperties.Setter threadPool = 
HystrixThreadPoolProperties.Setter();
-            setter.andThreadPoolPropertiesDefaults(threadPool);
+                    .andThreadPoolKey(tpKey);
+            HystrixCommandProperties.Setter commandSetter = 
HystrixCommandProperties.Setter();
+            setter.andCommandPropertiesDefaults(commandSetter);
+            HystrixThreadPoolProperties.Setter threadPoolSetter = 
HystrixThreadPoolProperties.Setter();
+            setter.andThreadPoolPropertiesDefaults(threadPoolSetter);
 
             // at first configure any shared options
             if (configRef != null) {
-                configureHystrix(command, threadPool, configRef);
+                configureHystrix(commandSetter, threadPoolSetter, configRef);
             }
             // then any local configured can override
             if (config != null) {
-                configureHystrix(command, threadPool, config);
+                configureHystrix(commandSetter, threadPoolSetter, config);
+            }
+
+            // create setter for fallback via network
+            HystrixCommand.Setter fallbackSetter = null;
+            boolean fallbackViaNetwork = cb.getOnFallback() != null && 
cb.getOnFallback().isFallbackViaNetwork();
+            if (fallbackViaNetwork) {
+                HystrixThreadPoolKey tpFallbackKey = 
HystrixThreadPoolKey.Factory.asKey(threadPoolKey + "-fallback");
+
+                fallbackSetter = HystrixCommand.Setter
+                        .withGroupKey(hcGroupKey)
+                        .andCommandKey(hcFallbackCommandKey)
+                        .andThreadPoolKey(tpFallbackKey);
+                HystrixCommandProperties.Setter commandFallbackSetter = 
HystrixCommandProperties.Setter();
+                
fallbackSetter.andCommandPropertiesDefaults(commandFallbackSetter);
+                HystrixThreadPoolProperties.Setter fallbackThreadPoolSetter = 
HystrixThreadPoolProperties.Setter();
+                
fallbackSetter.andThreadPoolPropertiesDefaults(fallbackThreadPoolSetter);
+
+                // at first configure any shared options
+                if (configRef != null) {
+                    configureHystrix(commandFallbackSetter, 
fallbackThreadPoolSetter, configRef);
+                }
+                // then any local configured can override
+                if (config != null) {
+                    configureHystrix(commandFallbackSetter, 
fallbackThreadPoolSetter, config);
+                }
             }
 
-            return new HystrixProcessor(hcCommandKey, hcGroupKey, setter, 
processor, fallback);
+            return new HystrixProcessor(hcGroupKey, hcCommandKey, 
hcFallbackCommandKey, setter, fallbackSetter, processor, fallback, 
fallbackViaNetwork);
         } else {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackTest.java
 
b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackTest.java
index 8d70ca8..63f194d 100644
--- 
a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackTest.java
+++ 
b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackTest.java
@@ -37,11 +37,13 @@ public class HystrixRouteFallbackTest extends 
CamelTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
+                    .to("log:start")
                     .hystrix()
                         .throwException(new IllegalArgumentException("Forced"))
                     .onFallback()
                         .transform().constant("Fallback message")
                     .end()
+                    .to("log:result")
                     .to("mock:result");
             }
         };

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackViaNetworkTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackViaNetworkTest.java
 
b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackViaNetworkTest.java
new file mode 100644
index 0000000..caf7b69
--- /dev/null
+++ 
b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteFallbackViaNetworkTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hystrix.processor;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class HystrixRouteFallbackViaNetworkTest extends CamelTestSupport {
+
+    @Test
+    public void testHystrix() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Fallback 
message");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("log:start")
+                    .hystrix()
+                        .throwException(new IllegalArgumentException("Forced"))
+                    .onFallbackViaNetwork()
+                        .transform().constant("Fallback message")
+                    .end()
+                    .to("log:result")
+                    .to("mock:result");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteOkTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteOkTest.java
 
b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteOkTest.java
index 4980bb4..92faf79 100644
--- 
a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteOkTest.java
+++ 
b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixRouteOkTest.java
@@ -24,9 +24,9 @@ public class HystrixRouteOkTest extends CamelTestSupport {
 
     @Test
     public void testHystrix() throws Exception {
-        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:result").expectedMinimumMessageCount(2);
 
-        template.sendBody("direct:start", "Hello World");
+//        template.sendBody("direct:start", "Hello World");
 
         assertMockEndpointsSatisfied();
     }
@@ -36,12 +36,15 @@ public class HystrixRouteOkTest extends CamelTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start")
+                from("timer:trigger")
+                    .to("log:trigger")
                     .hystrix()
                         .to("direct:foo")
+                        .to("log:foo")
                     .onFallback()
                         .transform().constant("Fallback message")
                     .end()
+                    .to("log:result")
                     .to("mock:result");
 
                 from("direct:foo")

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/client/pom.xml
----------------------------------------------------------------------
diff --git a/examples/camel-example-hystrix/client/pom.xml 
b/examples/camel-example-hystrix/client/pom.xml
index 2723c27..ed00524 100644
--- a/examples/camel-example-hystrix/client/pom.xml
+++ b/examples/camel-example-hystrix/client/pom.xml
@@ -30,10 +30,21 @@
   <name>Camel :: Example :: Hystrix :: Client</name>
   <description>An example showing how to use Hystrix EIP as circuit breaker in 
Camel routes</description>
 
-  <!-- import Camel BOM -->
+  <properties>
+    <spring.boot-version>${spring-boot-version}</spring.boot-version>
+  </properties>
+
+  <!-- import Spring-Boot and Camel BOM -->
   <dependencyManagement>
     <dependencies>
       <dependency>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-dependencies</artifactId>
+        <version>${spring.boot-version}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
+      <dependency>
         <groupId>org.apache.camel</groupId>
         <artifactId>camel-parent</artifactId>
         <version>${project.version}</version>
@@ -45,66 +56,36 @@
 
   <dependencies>
 
-    <!-- CDI API -->
+    <!-- spring-boot -->
     <dependency>
-      <groupId>javax.enterprise</groupId>
-      <artifactId>cdi-api</artifactId>
-      <version>${cdi-api-1.2-version}</version>
-      <scope>provided</scope>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
 
-    <!-- camel-cdi -->
+    <!-- camel -->
     <dependency>
       <groupId>org.apache.camel</groupId>
-      <artifactId>camel-cdi</artifactId>
+      <artifactId>camel-spring-boot-starter</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
-      <artifactId>camel-hystrix</artifactId>
+      <artifactId>camel-http</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
-      <artifactId>camel-http</artifactId>
+      <artifactId>camel-hystrix</artifactId>
     </dependency>
 
-    <!-- logging -->
-
-
   </dependencies>
 
   <build>
-
     <plugins>
-      <!-- allows the routes to be run via 'mvn camel:run' -->
       <plugin>
-        <groupId>org.apache.camel</groupId>
-        <artifactId>camel-maven-plugin</artifactId>
-        <version>${project.version}</version>
-        <dependencies>
-          <dependency>
-            <groupId>org.apache.deltaspike.cdictrl</groupId>
-            <artifactId>deltaspike-cdictrl-weld</artifactId>
-            <version>${deltaspike-version}</version>
-          </dependency>
-          <dependency>
-            <groupId>org.jboss.weld.se</groupId>
-            <artifactId>weld-se</artifactId>
-            <version>${weld2-version}</version>
-          </dependency>
-          <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>${slf4j-version}</version>
-          </dependency>
-          <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-            <version>${log4j-version}</version>
-          </dependency>
-        </dependencies>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-maven-plugin</artifactId>
+        <version>${spring-boot-version}</version>
       </plugin>
     </plugins>
-
   </build>
 
 </project>

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientApplication.java
----------------------------------------------------------------------
diff --git 
a/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientApplication.java
 
b/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientApplication.java
new file mode 100644
index 0000000..f5c3da8
--- /dev/null
+++ 
b/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientApplication.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sample.camel;
+
+import 
org.apache.camel.component.hystrix.metrics.servlet.HystrixEventStreamServlet;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.embedded.ServletRegistrationBean;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class ClientApplication {
+
+    /**
+     * A main method to start this application.
+     */
+    public static void main(String[] args) {
+        SpringApplication.run(ClientApplication.class, args);
+    }
+
+    @Bean
+    public HystrixEventStreamServlet hystrixServlet() {
+        return new HystrixEventStreamServlet();
+    }
+
+    @Bean
+    public ServletRegistrationBean servletRegistrationBean() {
+        return new ServletRegistrationBean(new 
HystrixEventStreamServlet(),"/hystrix.stream");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientRoute.java
----------------------------------------------------------------------
diff --git 
a/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientRoute.java
 
b/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientRoute.java
index be174a6..36a006f 100644
--- 
a/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientRoute.java
+++ 
b/examples/camel-example-hystrix/client/src/main/java/sample/camel/ClientRoute.java
@@ -17,20 +17,25 @@
 package sample.camel;
 
 import org.apache.camel.builder.RouteBuilder;
+import org.springframework.stereotype.Component;
 
+@Component
 public class ClientRoute extends RouteBuilder {
 
     @Override
     public void configure() {
         // you can configure the route rule with Java DSL here
-        from("timer:trigger?exchangePattern=InOut&period=10s")
+        from("timer:trigger?period=1s").streamCaching()
             .bean("counterBean")
+            .log(" Client request: ${body}")
             .hystrix()
-                .log(" Client request: ${body}")
                 .to("http://localhost:9090/service1";)
-            .onFallback()
-                .log(" Client fallback request: ${body}")
-                .to("http://localhost:9090/service2";)
+            //.onFallback()
+            // we use a fallback without network that provides a repsonse 
message immediately
+            //    .transform().simple("Fallback ${body}")
+            .onFallbackViaNetwork()
+                // we use fallback via network where we call a 2nd service
+                .to("http://localhost:7070/service2";)
             .end()
             .log("Client response: ${body}");
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/client/src/main/java/sample/camel/CounterBean.java
----------------------------------------------------------------------
diff --git 
a/examples/camel-example-hystrix/client/src/main/java/sample/camel/CounterBean.java
 
b/examples/camel-example-hystrix/client/src/main/java/sample/camel/CounterBean.java
index 0c27215..3df5fc5 100644
--- 
a/examples/camel-example-hystrix/client/src/main/java/sample/camel/CounterBean.java
+++ 
b/examples/camel-example-hystrix/client/src/main/java/sample/camel/CounterBean.java
@@ -16,11 +16,9 @@
  */
 package sample.camel;
 
-import javax.inject.Named;
-import javax.inject.Singleton;
+import org.springframework.stereotype.Component;
 
-@Singleton
-@Named("counterBean")
+@Component("counterBean")
 public class CounterBean {
 
     private int counter;

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/client/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git 
a/examples/camel-example-hystrix/client/src/main/resources/application.properties
 
b/examples/camel-example-hystrix/client/src/main/resources/application.properties
new file mode 100644
index 0000000..87475ba
--- /dev/null
+++ 
b/examples/camel-example-hystrix/client/src/main/resources/application.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+server.port=8080
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/service1/src/main/java/sample/camel/Service1Route.java
----------------------------------------------------------------------
diff --git 
a/examples/camel-example-hystrix/service1/src/main/java/sample/camel/Service1Route.java
 
b/examples/camel-example-hystrix/service1/src/main/java/sample/camel/Service1Route.java
index e785560..17242a7 100644
--- 
a/examples/camel-example-hystrix/service1/src/main/java/sample/camel/Service1Route.java
+++ 
b/examples/camel-example-hystrix/service1/src/main/java/sample/camel/Service1Route.java
@@ -26,7 +26,6 @@ public class Service1Route extends RouteBuilder {
     public void configure() throws Exception {
         
from("jetty:http://0.0.0.0:{{service1.port}}/service1";).routeId("service1").streamCaching()
             .log("Service1 request: ${body}")
-            .delay(simple("${random(1000,2000)}"))
             .transform(simple("Service1-${body}"))
             .log("Service1 response: ${body}");
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/7b5ccfff/examples/camel-example-hystrix/service2/src/main/java/sample/camel/Service2Route.java
----------------------------------------------------------------------
diff --git 
a/examples/camel-example-hystrix/service2/src/main/java/sample/camel/Service2Route.java
 
b/examples/camel-example-hystrix/service2/src/main/java/sample/camel/Service2Route.java
index 9510461..9bb2cbb 100644
--- 
a/examples/camel-example-hystrix/service2/src/main/java/sample/camel/Service2Route.java
+++ 
b/examples/camel-example-hystrix/service2/src/main/java/sample/camel/Service2Route.java
@@ -24,7 +24,6 @@ public class Service2Route extends RouteBuilder {
     public void configure() throws Exception {
         
from("undertow:http://0.0.0.0:7070/service2";).routeId("service2").streamCaching()
                 .log(" Service2 request: ${body}")
-                .delay(simple("${random(1000,2000)}"))
                 .transform(simple("Service2-${body}"))
                 .log("Service2 response: ${body}");
     }

Reply via email to