CAMEL-10662: camel-hystrix - thread race when hystrix timeout triggers then 
fallback can run concurrently with run. Added some timeout related tests.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/906a612d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/906a612d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/906a612d

Branch: refs/heads/master
Commit: 906a612d3b59c4a36ad53d084b8cef3ba608cdc4
Parents: 5807f21
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu Dec 29 16:10:46 2016 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Dec 29 17:54:07 2016 +0100

----------------------------------------------------------------------
 .../processor/HystrixProcessorCommand.java      | 144 ++++++++++++-------
 .../hystrix/processor/HystrixTimeoutTest.java   |  97 +++++++++++++
 .../HystrixTimeoutWithFallbackTest.java         |  80 +++++++++++
 3 files changed, 272 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/906a612d/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
 
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
index 4d86ef7..511a46e 100644
--- 
a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
+++ 
b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java
@@ -16,11 +16,14 @@
  */
 package org.apache.camel.component.hystrix.processor;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import com.netflix.hystrix.HystrixCommand;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.util.ExchangeHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +37,8 @@ public class HystrixProcessorCommand extends HystrixCommand {
     private final Processor processor;
     private final Processor fallback;
     private final HystrixProcessorCommandFallbackViaNetwork fallbackCommand;
+    private final AtomicBoolean fallbackInUse = new AtomicBoolean();
+    private final Object lock = new Object();
 
     public HystrixProcessorCommand(Setter setter, Exchange exchange, Processor 
processor, Processor fallback,
                                    HystrixProcessorCommandFallbackViaNetwork 
fallbackCommand) {
@@ -46,41 +51,51 @@ public class HystrixProcessorCommand extends HystrixCommand 
{
 
     @Override
     protected Message getFallback() {
-        if (fallback != null || fallbackCommand != null) {
-            // grab the exception that caused the error (can be failure in 
run, or from hystrix if short circuited)
-            Throwable exception = getExecutionException();
+        // guard by lock as the run command can be running concurrently in 
case hystrix caused a timeout which
+        // can cause the fallback timer to trigger this fallback at the same 
time the run command may be running
+        // after its processor.process method which could cause both threads 
to mutate the state on the exchange
+        synchronized (lock) {
+            fallbackInUse.set(true);
+        }
 
-            if (exception != null) {
-                LOG.debug("Error occurred processing. Will now run fallback. 
Exception class: {} message: {}.", exception.getClass().getName(), 
exception.getMessage());
+        if (fallback == null && fallbackCommand == null) {
+            // no fallback in use
+            throw new UnsupportedOperationException("No fallback available.");
+        }
+
+        // grab the exception that caused the error (can be failure in run, or 
from hystrix if short circuited)
+        Throwable exception = getExecutionException();
+
+        if (exception != null) {
+            LOG.debug("Error occurred processing. Will now run fallback. 
Exception class: {} message: {}.", exception.getClass().getName(), 
exception.getMessage());
+        } else {
+            LOG.debug("Error occurred processing. Will now run fallback.");
+        }
+        // store the last to endpoint as the failure endpoint
+        if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
+            exchange.setProperty(Exchange.FAILURE_ENDPOINT, 
exchange.getProperty(Exchange.TO_ENDPOINT));
+        }
+        // give the rest of the pipeline another chance
+        exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
+        exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
+        exchange.removeProperty(Exchange.ROUTE_STOP);
+        exchange.setException(null);
+        // and we should not be regarded as exhausted as we are in a try .. 
catch block
+        exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+        // run the fallback processor
+        try {
+            // use fallback command if provided (fallback via network)
+            if (fallbackCommand != null) {
+                return fallbackCommand.execute();
             } else {
-                LOG.debug("Error occurred processing. Will now run fallback.");
-            }
-            // store the last to endpoint as the failure endpoint
-            if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
-                exchange.setProperty(Exchange.FAILURE_ENDPOINT, 
exchange.getProperty(Exchange.TO_ENDPOINT));
-            }
-            // give the rest of the pipeline another chance
-            exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
-            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception);
-            exchange.removeProperty(Exchange.ROUTE_STOP);
-            exchange.setException(null);
-            // and we should not be regarded as exhausted as we are in a try 
.. catch block
-            exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
-            // run the fallback processor
-            try {
-                // use fallback command if provided (fallback via network)
-                if (fallbackCommand != null) {
-                    return fallbackCommand.execute();
-                } else {
-                    LOG.debug("Running fallback: {} with exchange: {}", 
fallback, exchange);
-                    // process the fallback until its fully done
-                    // (we do not hav any hystrix callback to leverage so we 
need to complete all work in this run method)
-                    fallback.process(exchange);
-                    LOG.debug("Running fallback: {} with exchange: {} done", 
fallback, exchange);
-                }
-            } catch (Exception e) {
-                exchange.setException(e);
+                LOG.debug("Running fallback: {} with exchange: {}", fallback, 
exchange);
+                // process the fallback until its fully done
+                // (we do not hav any hystrix callback to leverage so we need 
to complete all work in this run method)
+                fallback.process(exchange);
+                LOG.debug("Running fallback: {} with exchange: {} done", 
fallback, exchange);
             }
+        } catch (Exception e) {
+            exchange.setException(e);
         }
 
         return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
@@ -90,31 +105,62 @@ public class HystrixProcessorCommand extends 
HystrixCommand {
     protected Message run() throws Exception {
         LOG.debug("Running processor: {} with exchange: {}", processor, 
exchange);
 
+        // prepare a copy of exchange so downstream processors don't cause 
side-effects if they mutate the exchange
+        // in case Hystrix timeout processing and continue with the fallback 
etc
+        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, 
false);
         try {
             // process the processor until its fully done
             // (we do not hav any hystrix callback to leverage so we need to 
complete all work in this run method)
-            processor.process(exchange);
+            processor.process(copy);
         } catch (Exception e) {
-            exchange.setException(e);
+            copy.setException(e);
         }
 
-        // is fallback enabled
-        Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
-
-        // execution exception must take precedence over exchange exception
-        // because hystrix may have caused this command to fail due timeout or 
something else
-        Throwable exception = getExecutionException();
-        if (exception != null) {
-            exchange.setException(new CamelExchangeException("Hystrix 
execution exception occurred while processing Exchange", exchange, exception));
+        // when a hystrix timeout occurs then a hystrix timer thread executes 
the fallback
+        // and therefore we need this thread to not do anymore if fallback is 
already in process
+        if (fallbackInUse.get()) {
+            LOG.debug("Exiting run command as fallback is already in use 
processing exchange: {}", exchange);
+            return null;
         }
 
-        // if we failed then throw an exception if fallback is enabled
-        if (fallbackEnabled == null || fallbackEnabled && 
exchange.getException() != null) {
-            throw exchange.getException();
-        }
+        // remember any hystrix execution exception which for example can be 
triggered by a hystrix timeout
+        Throwable cause = getExecutionException();
 
-        // no fallback then we are done
-        LOG.debug("Running processor: {} with exchange: {} done", processor, 
exchange);
-        return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+        synchronized (lock) {
+
+            // when a hystrix timeout occurs then a hystrix timer thread 
executes the fallback
+            // and therefore we need this thread to not do anymore if fallback 
is already in process
+            if (fallbackInUse.get()) {
+                LOG.debug("Exiting run command as fallback is already in use 
processing exchange: {}", exchange);
+                return null;
+            }
+
+            // and copy the result
+            ExchangeHelper.copyResults(exchange, copy);
+
+            // is fallback enabled
+            Boolean fallbackEnabled = getProperties().fallbackEnabled().get();
+
+            // execution exception must take precedence over exchange exception
+            // because hystrix may have caused this command to fail due 
timeout or something else
+            if (cause != null) {
+                exchange.setException(new CamelExchangeException("Hystrix 
execution exception occurred while processing Exchange", exchange, cause));
+            }
+
+            // if we have a fallback that can process the exchange in case of 
an exception
+            // then we need to trigger this by throwing an exception so 
Hystrix will execute the fallback
+            // if we don't have a fallback and an exception was thrown then 
its stored on the exchange
+            // and Camel will detect the exception anyway
+            if (fallback != null || fallbackCommand != null) {
+                if (fallbackEnabled == null || fallbackEnabled && 
exchange.getException() != null) {
+                    // throwing exception will cause hystrix to execute 
fallback
+                    throw exchange.getException();
+                }
+            }
+
+            LOG.debug("Running processor: {} with exchange: {} done", 
processor, exchange);
+            return exchange.hasOut() ? exchange.getOut() : exchange.getIn();
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/906a612d/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
 
b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
new file mode 100644
index 0000000..b36203c
--- /dev/null
+++ 
b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hystrix.processor;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Hystrix using timeout with Java DSL
+ */
+public class HystrixTimeoutTest extends CamelTestSupport {
+
+    @Test
+    public void testFast() throws Exception {
+        // this calls the fast route and therefore we get a response
+        Object out = template.requestBody("direct:start", "fast");
+        assertEquals("Fast response", out);
+    }
+
+    @Test
+    public void testSlow() throws Exception {
+        // this calls the slow route and therefore causes a timeout which 
triggers an exception
+        try {
+            template.requestBody("direct:start", "slow");
+            fail("Should fail due timeout");
+        } catch (Exception e) {
+            // expected a timeout
+            assertIsInstanceOf(TimeoutException.class, 
e.getCause().getCause());
+        }
+    }
+
+    @Test
+    public void testSlowLoop() throws Exception {
+        // this calls the slow route and therefore causes a timeout which 
triggers an exception
+        for (int i = 0; i < 10; i++) {
+            try {
+                log.info(">>> test run " + i + " <<<");
+                template.requestBody("direct:start", "slow");
+                fail("Should fail due timeout");
+            } catch (Exception e) {
+                // expected a timeout
+                assertIsInstanceOf(TimeoutException.class, 
e.getCause().getCause());
+            }
+        }
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .hystrix()
+                        // use 2 second timeout
+                        
.hystrixConfiguration().executionTimeoutInMilliseconds(2000).end()
+                        .log("Hystrix processing start: ${threadName}")
+                        .toD("direct:${body}")
+                        .log("Hystrix processing end: ${threadName}")
+                    .end()
+                    .log("After Hystrix ${body}");
+
+                from("direct:fast")
+                    // this is a fast route and takes 1 second to respond
+                    .log("Fast processing start: ${threadName}")
+                    .delay(1000)
+                    .transform().constant("Fast response")
+                    .log("Fast processing end: ${threadName}");
+
+                from("direct:slow")
+                    // this is a slow route and takes 3 second to respond
+                    .log("Slow processing start: ${threadName}")
+                    .delay(3000)
+                    .transform().constant("Slow response")
+                    .log("Slow processing end: ${threadName}");
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/906a612d/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
 
b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
new file mode 100644
index 0000000..27790bb
--- /dev/null
+++ 
b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hystrix.processor;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Hystrix using timeout and fallback with Java DSL
+ */
+public class HystrixTimeoutWithFallbackTest extends CamelTestSupport {
+
+    @Test
+    public void testFast() throws Exception {
+        // this calls the fast route and therefore we get a response
+        Object out = template.requestBody("direct:start", "fast");
+        assertEquals("Fast response", out);
+    }
+
+    @Test
+    public void testSlow() throws Exception {
+        // this calls the slow route and therefore causes a timeout which 
triggers the fallback
+        Object out = template.requestBody("direct:start", "slow");
+        assertEquals("Fallback response", out);
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .hystrix()
+                    // use 2 second timeout
+                    
.hystrixConfiguration().executionTimeoutInMilliseconds(2000).end()
+                        .log("Hystrix processing start: ${threadName}")
+                        .toD("direct:${body}")
+                        .log("Hystrix processing end: ${threadName}")
+                    .onFallback()
+                        // use fallback if there was an exception or timeout
+                        .log("Hystrix fallback start: ${threadName}")
+                        .transform().constant("Fallback response")
+                        .log("Hystrix fallback end: ${threadName}")
+                    .end()
+                    .log("After Hystrix ${body}");
+
+                from("direct:fast")
+                    // this is a fast route and takes 1 second to respond
+                    .log("Fast processing start: ${threadName}")
+                    .delay(1000)
+                    .transform().constant("Fast response")
+                    .log("Fast processing end: ${threadName}");
+
+                from("direct:slow")
+                    // this is a slow route and takes 3 second to respond
+                    .log("Slow processing start: ${threadName}")
+                    .delay(3000)
+                    .transform().constant("Slow response")
+                    .log("Slow processing end: ${threadName}");
+            }
+        };
+    }
+
+}

Reply via email to