CAMEL-10662: camel-hystrix - thread race when hystrix timeout triggers then fallback can run concurrently with run. Added some timeout related tests.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/906a612d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/906a612d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/906a612d Branch: refs/heads/master Commit: 906a612d3b59c4a36ad53d084b8cef3ba608cdc4 Parents: 5807f21 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Dec 29 16:10:46 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Dec 29 17:54:07 2016 +0100 ---------------------------------------------------------------------- .../processor/HystrixProcessorCommand.java | 144 ++++++++++++------- .../hystrix/processor/HystrixTimeoutTest.java | 97 +++++++++++++ .../HystrixTimeoutWithFallbackTest.java | 80 +++++++++++ 3 files changed, 272 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/906a612d/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java index 4d86ef7..511a46e 100644 --- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/processor/HystrixProcessorCommand.java @@ -16,11 +16,14 @@ */ package org.apache.camel.component.hystrix.processor; +import java.util.concurrent.atomic.AtomicBoolean; + import com.netflix.hystrix.HystrixCommand; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.util.ExchangeHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +37,8 @@ public class HystrixProcessorCommand extends HystrixCommand { private final Processor processor; private final Processor fallback; private final HystrixProcessorCommandFallbackViaNetwork fallbackCommand; + private final AtomicBoolean fallbackInUse = new AtomicBoolean(); + private final Object lock = new Object(); public HystrixProcessorCommand(Setter setter, Exchange exchange, Processor processor, Processor fallback, HystrixProcessorCommandFallbackViaNetwork fallbackCommand) { @@ -46,41 +51,51 @@ public class HystrixProcessorCommand extends HystrixCommand { @Override protected Message getFallback() { - if (fallback != null || fallbackCommand != null) { - // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited) - Throwable exception = getExecutionException(); + // guard by lock as the run command can be running concurrently in case hystrix caused a timeout which + // can cause the fallback timer to trigger this fallback at the same time the run command may be running + // after its processor.process method which could cause both threads to mutate the state on the exchange + synchronized (lock) { + fallbackInUse.set(true); + } - if (exception != null) { - LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage()); + if (fallback == null && fallbackCommand == null) { + // no fallback in use + throw new UnsupportedOperationException("No fallback available."); + } + + // grab the exception that caused the error (can be failure in run, or from hystrix if short circuited) + Throwable exception = getExecutionException(); + + if (exception != null) { + LOG.debug("Error occurred processing. Will now run fallback. Exception class: {} message: {}.", exception.getClass().getName(), exception.getMessage()); + } else { + LOG.debug("Error occurred processing. Will now run fallback."); + } + // store the last to endpoint as the failure endpoint + if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) { + exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); + } + // give the rest of the pipeline another chance + exchange.setProperty(Exchange.EXCEPTION_HANDLED, true); + exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception); + exchange.removeProperty(Exchange.ROUTE_STOP); + exchange.setException(null); + // and we should not be regarded as exhausted as we are in a try .. catch block + exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); + // run the fallback processor + try { + // use fallback command if provided (fallback via network) + if (fallbackCommand != null) { + return fallbackCommand.execute(); } else { - LOG.debug("Error occurred processing. Will now run fallback."); - } - // store the last to endpoint as the failure endpoint - if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) { - exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); - } - // give the rest of the pipeline another chance - exchange.setProperty(Exchange.EXCEPTION_HANDLED, true); - exchange.setProperty(Exchange.EXCEPTION_CAUGHT, exception); - exchange.removeProperty(Exchange.ROUTE_STOP); - exchange.setException(null); - // and we should not be regarded as exhausted as we are in a try .. catch block - exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); - // run the fallback processor - try { - // use fallback command if provided (fallback via network) - if (fallbackCommand != null) { - return fallbackCommand.execute(); - } else { - LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange); - // process the fallback until its fully done - // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method) - fallback.process(exchange); - LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange); - } - } catch (Exception e) { - exchange.setException(e); + LOG.debug("Running fallback: {} with exchange: {}", fallback, exchange); + // process the fallback until its fully done + // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method) + fallback.process(exchange); + LOG.debug("Running fallback: {} with exchange: {} done", fallback, exchange); } + } catch (Exception e) { + exchange.setException(e); } return exchange.hasOut() ? exchange.getOut() : exchange.getIn(); @@ -90,31 +105,62 @@ public class HystrixProcessorCommand extends HystrixCommand { protected Message run() throws Exception { LOG.debug("Running processor: {} with exchange: {}", processor, exchange); + // prepare a copy of exchange so downstream processors don't cause side-effects if they mutate the exchange + // in case Hystrix timeout processing and continue with the fallback etc + Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false, false); try { // process the processor until its fully done // (we do not hav any hystrix callback to leverage so we need to complete all work in this run method) - processor.process(exchange); + processor.process(copy); } catch (Exception e) { - exchange.setException(e); + copy.setException(e); } - // is fallback enabled - Boolean fallbackEnabled = getProperties().fallbackEnabled().get(); - - // execution exception must take precedence over exchange exception - // because hystrix may have caused this command to fail due timeout or something else - Throwable exception = getExecutionException(); - if (exception != null) { - exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, exception)); + // when a hystrix timeout occurs then a hystrix timer thread executes the fallback + // and therefore we need this thread to not do anymore if fallback is already in process + if (fallbackInUse.get()) { + LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange); + return null; } - // if we failed then throw an exception if fallback is enabled - if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) { - throw exchange.getException(); - } + // remember any hystrix execution exception which for example can be triggered by a hystrix timeout + Throwable cause = getExecutionException(); - // no fallback then we are done - LOG.debug("Running processor: {} with exchange: {} done", processor, exchange); - return exchange.hasOut() ? exchange.getOut() : exchange.getIn(); + synchronized (lock) { + + // when a hystrix timeout occurs then a hystrix timer thread executes the fallback + // and therefore we need this thread to not do anymore if fallback is already in process + if (fallbackInUse.get()) { + LOG.debug("Exiting run command as fallback is already in use processing exchange: {}", exchange); + return null; + } + + // and copy the result + ExchangeHelper.copyResults(exchange, copy); + + // is fallback enabled + Boolean fallbackEnabled = getProperties().fallbackEnabled().get(); + + // execution exception must take precedence over exchange exception + // because hystrix may have caused this command to fail due timeout or something else + if (cause != null) { + exchange.setException(new CamelExchangeException("Hystrix execution exception occurred while processing Exchange", exchange, cause)); + } + + // if we have a fallback that can process the exchange in case of an exception + // then we need to trigger this by throwing an exception so Hystrix will execute the fallback + // if we don't have a fallback and an exception was thrown then its stored on the exchange + // and Camel will detect the exception anyway + if (fallback != null || fallbackCommand != null) { + if (fallbackEnabled == null || fallbackEnabled && exchange.getException() != null) { + // throwing exception will cause hystrix to execute fallback + throw exchange.getException(); + } + } + + LOG.debug("Running processor: {} with exchange: {} done", processor, exchange); + return exchange.hasOut() ? exchange.getOut() : exchange.getIn(); + } } + } http://git-wip-us.apache.org/repos/asf/camel/blob/906a612d/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java new file mode 100644 index 0000000..b36203c --- /dev/null +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix.processor; + +import java.util.concurrent.TimeoutException; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Hystrix using timeout with Java DSL + */ +public class HystrixTimeoutTest extends CamelTestSupport { + + @Test + public void testFast() throws Exception { + // this calls the fast route and therefore we get a response + Object out = template.requestBody("direct:start", "fast"); + assertEquals("Fast response", out); + } + + @Test + public void testSlow() throws Exception { + // this calls the slow route and therefore causes a timeout which triggers an exception + try { + template.requestBody("direct:start", "slow"); + fail("Should fail due timeout"); + } catch (Exception e) { + // expected a timeout + assertIsInstanceOf(TimeoutException.class, e.getCause().getCause()); + } + } + + @Test + public void testSlowLoop() throws Exception { + // this calls the slow route and therefore causes a timeout which triggers an exception + for (int i = 0; i < 10; i++) { + try { + log.info(">>> test run " + i + " <<<"); + template.requestBody("direct:start", "slow"); + fail("Should fail due timeout"); + } catch (Exception e) { + // expected a timeout + assertIsInstanceOf(TimeoutException.class, e.getCause().getCause()); + } + } + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .hystrix() + // use 2 second timeout + .hystrixConfiguration().executionTimeoutInMilliseconds(2000).end() + .log("Hystrix processing start: ${threadName}") + .toD("direct:${body}") + .log("Hystrix processing end: ${threadName}") + .end() + .log("After Hystrix ${body}"); + + from("direct:fast") + // this is a fast route and takes 1 second to respond + .log("Fast processing start: ${threadName}") + .delay(1000) + .transform().constant("Fast response") + .log("Fast processing end: ${threadName}"); + + from("direct:slow") + // this is a slow route and takes 3 second to respond + .log("Slow processing start: ${threadName}") + .delay(3000) + .transform().constant("Slow response") + .log("Slow processing end: ${threadName}"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/906a612d/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java new file mode 100644 index 0000000..27790bb --- /dev/null +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixTimeoutWithFallbackTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix.processor; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +/** + * Hystrix using timeout and fallback with Java DSL + */ +public class HystrixTimeoutWithFallbackTest extends CamelTestSupport { + + @Test + public void testFast() throws Exception { + // this calls the fast route and therefore we get a response + Object out = template.requestBody("direct:start", "fast"); + assertEquals("Fast response", out); + } + + @Test + public void testSlow() throws Exception { + // this calls the slow route and therefore causes a timeout which triggers the fallback + Object out = template.requestBody("direct:start", "slow"); + assertEquals("Fallback response", out); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .hystrix() + // use 2 second timeout + .hystrixConfiguration().executionTimeoutInMilliseconds(2000).end() + .log("Hystrix processing start: ${threadName}") + .toD("direct:${body}") + .log("Hystrix processing end: ${threadName}") + .onFallback() + // use fallback if there was an exception or timeout + .log("Hystrix fallback start: ${threadName}") + .transform().constant("Fallback response") + .log("Hystrix fallback end: ${threadName}") + .end() + .log("After Hystrix ${body}"); + + from("direct:fast") + // this is a fast route and takes 1 second to respond + .log("Fast processing start: ${threadName}") + .delay(1000) + .transform().constant("Fast response") + .log("Fast processing end: ${threadName}"); + + from("direct:slow") + // this is a slow route and takes 3 second to respond + .log("Slow processing start: ${threadName}") + .delay(3000) + .transform().constant("Slow response") + .log("Slow processing end: ${threadName}"); + } + }; + } + +}