CAMEL-8473: Error handler add onPrepare to allow custom processor to prepare the exchange before sending to failure processor such as dead letter queue.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8c47e24b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8c47e24b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8c47e24b Branch: refs/heads/master Commit: 8c47e24bd0516b97ad286a75a0649f9be592dc49 Parents: 1faeeb2 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Mar 11 09:13:23 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Mar 11 10:02:06 2015 +0100 ---------------------------------------------------------------------- .../camel/builder/DeadLetterChannelBuilder.java | 2 +- .../builder/DefaultErrorHandlerBuilder.java | 25 +++++++- .../camel/processor/DeadLetterChannel.java | 6 +- .../camel/processor/DefaultErrorHandler.java | 7 ++- .../camel/processor/LoggingErrorHandler.java | 2 +- .../camel/processor/RedeliveryErrorHandler.java | 28 ++++++++- .../DeadLetterChannelOnPrepareTest.java | 57 ++++++++++++++++++ .../DefaultErrorHandlerOnPrepareTest.java | 61 ++++++++++++++++++++ .../blueprint/CamelErrorHandlerFactoryBean.java | 5 ++ .../camel/spring/ErrorHandlerDefinition.java | 2 + .../handler/ErrorHandlerDefinitionParser.java | 7 +++ .../spring/spi/TransactionErrorHandler.java | 2 +- ...pringDeadLetterChannelOnPrepareTestTest.java | 29 ++++++++++ ...ingDefaultErrorHandlerOnPrepareTestTest.java | 29 ++++++++++ .../DeadLetterChannelOnPrepareTest.xml | 43 ++++++++++++++ .../DefaultErrorHandlerOnPrepareTest.xml | 43 ++++++++++++++ .../DeadLetterChannelOnPrepareTestTest.java | 49 ++++++++++++++++ .../DefaultErrorHandlerOnPrepareTestTest.java | 53 +++++++++++++++++ .../DeadLetterChannelOnPrepareTestTest.xml | 42 ++++++++++++++ .../DefaultErrorHandlerOnPrepareTestTest.xml | 42 ++++++++++++++ 20 files changed, 525 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java b/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java index e1e417d..d7a9d3b 100644 --- a/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java +++ b/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java @@ -56,7 +56,7 @@ public class DeadLetterChannelBuilder extends DefaultErrorHandlerBuilder { DeadLetterChannel answer = new DeadLetterChannel(routeContext.getCamelContext(), processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(), getExceptionPolicyStrategy(), getFailureProcessor(), getDeadLetterUri(), isDeadLetterHandleNewException(), - isUseOriginalMessage(), getRetryWhilePolicy(routeContext.getCamelContext()), getExecutorService(routeContext.getCamelContext())); + isUseOriginalMessage(), getRetryWhilePolicy(routeContext.getCamelContext()), getExecutorService(routeContext.getCamelContext()), getOnPrepareFailure()); // configure error handler before we can use it configure(routeContext, answer); return answer; http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java b/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java index 091c166..c879cca 100644 --- a/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java +++ b/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java @@ -54,13 +54,15 @@ public class DefaultErrorHandlerBuilder extends ErrorHandlerBuilderSupport { protected boolean asyncDelayedRedelivery; protected String executorServiceRef; protected ScheduledExecutorService executorService; + protected Processor onPrepareFailure; public DefaultErrorHandlerBuilder() { } public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception { DefaultErrorHandler answer = new DefaultErrorHandler(routeContext.getCamelContext(), processor, getLogger(), getOnRedelivery(), - getRedeliveryPolicy(), getExceptionPolicyStrategy(), getRetryWhilePolicy(routeContext.getCamelContext()), getExecutorService(routeContext.getCamelContext())); + getRedeliveryPolicy(), getExceptionPolicyStrategy(), getRetryWhilePolicy(routeContext.getCamelContext()), + getExecutorService(routeContext.getCamelContext()), getOnPrepareFailure()); // configure error handler before we can use it configure(routeContext, answer); return answer; @@ -375,6 +377,19 @@ public class DefaultErrorHandlerBuilder extends ErrorHandlerBuilderSupport { return this; } + /** + * Sets a custom {@link org.apache.camel.Processor} to prepare the {@link org.apache.camel.Exchange} before + * handled by the failure processor / dead letter channel. This allows for example to enrich the message + * before sending to a dead letter queue. + * + * @param processor the processor + * @return the builder + */ + public DefaultErrorHandlerBuilder onPrepareFailure(Processor processor) { + setOnPrepareFailure(processor); + return this; + } + // Properties // ------------------------------------------------------------------------- @@ -497,6 +512,14 @@ public class DefaultErrorHandlerBuilder extends ErrorHandlerBuilderSupport { this.executorServiceRef = executorServiceRef; } + public Processor getOnPrepareFailure() { + return onPrepareFailure; + } + + public void setOnPrepareFailure(Processor onPrepareFailure) { + this.onPrepareFailure = onPrepareFailure; + } + protected RedeliveryPolicy createRedeliveryPolicy() { RedeliveryPolicy policy = new RedeliveryPolicy(); policy.disableRedelivery(); http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java b/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java index b8da447..6200c09 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java +++ b/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java @@ -52,13 +52,15 @@ public class DeadLetterChannel extends RedeliveryErrorHandler { * @param useOriginalBodyPolicy should the original IN body be moved to the dead letter queue or the current exchange IN body? * @param retryWhile retry while * @param executorService the {@link java.util.concurrent.ScheduledExecutorService} to be used for redelivery thread pool. Can be <tt>null</tt>. + * @param onPrepare a custom {@link org.apache.camel.Processor} to prepare the {@link org.apache.camel.Exchange} before + * handled by the failure processor / dead letter channel. */ public DeadLetterChannel(CamelContext camelContext, Processor output, CamelLogger logger, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy, Processor deadLetter, String deadLetterUri, boolean deadLetterHandleException, - boolean useOriginalBodyPolicy, Predicate retryWhile, ScheduledExecutorService executorService) { + boolean useOriginalBodyPolicy, Predicate retryWhile, ScheduledExecutorService executorService, Processor onPrepare) { super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, deadLetter, deadLetterUri, deadLetterHandleException, - useOriginalBodyPolicy, retryWhile, executorService); + useOriginalBodyPolicy, retryWhile, executorService, onPrepare); setExceptionPolicy(exceptionPolicyStrategy); } http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java index 8f1c931..697db20 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java @@ -43,11 +43,14 @@ public class DefaultErrorHandler extends RedeliveryErrorHandler { * @param exceptionPolicyStrategy strategy for onException handling * @param retryWhile retry while * @param executorService the {@link java.util.concurrent.ScheduledExecutorService} to be used for redelivery thread pool. Can be <tt>null</tt>. + * @param onPrepare a custom {@link org.apache.camel.Processor} to prepare the {@link org.apache.camel.Exchange} before + * handled by the failure processor / dead letter channel. */ public DefaultErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger, Processor redeliveryProcessor, - RedeliveryPolicy redeliveryPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy, Predicate retryWhile, ScheduledExecutorService executorService) { + RedeliveryPolicy redeliveryPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy, Predicate retryWhile, + ScheduledExecutorService executorService, Processor onPrepare) { - super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, null, null, true, false, retryWhile, executorService); + super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, null, null, true, false, retryWhile, executorService, onPrepare); setExceptionPolicy(exceptionPolicyStrategy); } http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java index daca393..1f66275 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/LoggingErrorHandler.java @@ -39,7 +39,7 @@ public class LoggingErrorHandler extends DefaultErrorHandler { */ public LoggingErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger, RedeliveryPolicy redeliveryPolicy, ExceptionPolicyStrategy exceptionPolicyStrategy) { - super(camelContext, output, logger, null, redeliveryPolicy, exceptionPolicyStrategy, null, null); + super(camelContext, output, logger, null, redeliveryPolicy, exceptionPolicyStrategy, null, null, null); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java index da63f3c..e1de1d2 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java @@ -74,6 +74,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme protected boolean redeliveryEnabled; protected volatile boolean preparingShutdown; protected final ExchangeFormatter exchangeFormatter; + protected final Processor onPrepare; /** * Contains the current redelivery data @@ -91,6 +92,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme Processor deadLetterProcessor; Processor failureProcessor; Processor onRedeliveryProcessor; + Processor onPrepareProcessor; Predicate handledPredicate; Predicate continuedPredicate; boolean useOriginalInMessage; @@ -102,6 +104,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme this.currentRedeliveryPolicy = redeliveryPolicy; this.deadLetterProcessor = deadLetter; this.onRedeliveryProcessor = redeliveryProcessor; + this.onPrepareProcessor = onPrepare; this.handledPredicate = getDefaultHandledPredicate(); this.useOriginalInMessage = useOriginalMessagePolicy; this.handleNewException = deadLetterHandleNewException; @@ -196,7 +199,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger, Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter, - String deadLetterUri, boolean deadLetterHandleNewException, boolean useOriginalMessagePolicy, Predicate retryWhile, ScheduledExecutorService executorService) { + String deadLetterUri, boolean deadLetterHandleNewException, boolean useOriginalMessagePolicy, + Predicate retryWhile, ScheduledExecutorService executorService, Processor onPrepare) { ObjectHelper.notNull(camelContext, "CamelContext", this); ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this); @@ -213,6 +217,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme this.useOriginalMessagePolicy = useOriginalMessagePolicy; this.retryWhilePolicy = retryWhile; this.executorService = executorService; + this.onPrepare = onPrepare; if (ObjectHelper.isNotEmpty(redeliveryPolicy.getExchangeFormatterRef())) { ExchangeFormatter formatter = camelContext.getRegistry().lookupByNameAndType(redeliveryPolicy.getExchangeFormatterRef(), ExchangeFormatter.class); @@ -881,6 +886,17 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme // reset cached streams so they can be read again MessageHelper.resetStreamCache(exchange.getIn()); + // invoke custom on prepare + if (onPrepare != null) { + try { + log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepare, exchange); + onPrepare.process(exchange); + } catch (Exception e) { + // a new exception was thrown during prepare + exchange.setException(e); + } + } + log.trace("Failure processor {} is processing Exchange: {}", processor, exchange); // store the last to endpoint as the failure endpoint @@ -910,6 +926,16 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme }); } else { try { + // invoke custom on prepare + if (onPrepare != null) { + try { + log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepare, exchange); + onPrepare.process(exchange); + } catch (Exception e) { + // a new exception was thrown during prepare + exchange.setException(e); + } + } // no processor but we need to prepare after failure as well prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue); } finally { http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnPrepareTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnPrepareTest.java b/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnPrepareTest.java new file mode 100644 index 0000000..948bab0 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnPrepareTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +public class DeadLetterChannelOnPrepareTest extends ContextTestSupport { + + public void testDeadLetterChannelOnPrepare() throws Exception { + getMockEndpoint("mock:dead").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:dead").expectedHeaderReceived("FailedBecause", "Forced"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead").onPrepareFailure(new MyPrepareProcessor())); + + from("direct:start") + .log("Incoming ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }; + } + + public static class MyPrepareProcessor implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); + exchange.getIn().setHeader("FailedBecause", cause.getMessage()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerOnPrepareTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerOnPrepareTest.java b/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerOnPrepareTest.java new file mode 100644 index 0000000..925fd80 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerOnPrepareTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +public class DefaultErrorHandlerOnPrepareTest extends ContextTestSupport { + + public void testDefaultErrorHandlerOnPrepare() throws Exception { + Exchange out = template.request("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello World"); + } + }); + assertNotNull(out); + assertTrue("Should be failed", out.isFailed()); + assertIsInstanceOf(IllegalArgumentException.class, out.getException()); + assertEquals("Forced", out.getIn().getHeader("FailedBecause")); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(defaultErrorHandler().onPrepareFailure(new MyPrepareProcessor())); + + from("direct:start") + .log("Incoming ${body}") + .throwException(new IllegalArgumentException("Forced")); + } + }; + } + + public static class MyPrepareProcessor implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); + exchange.getIn().setHeader("FailedBecause", cause.getMessage()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelErrorHandlerFactoryBean.java ---------------------------------------------------------------------- diff --git a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelErrorHandlerFactoryBean.java b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelErrorHandlerFactoryBean.java index f81867b..98244a1 100644 --- a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelErrorHandlerFactoryBean.java +++ b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/CamelErrorHandlerFactoryBean.java @@ -54,6 +54,8 @@ public class CamelErrorHandlerFactoryBean extends AbstractCamelFactoryBean<Error @XmlAttribute private String onRedeliveryRef; @XmlAttribute + private String onPrepareFailureRef; + @XmlAttribute private String retryWhileRef; @XmlAttribute private String executorServiceRef; @@ -87,6 +89,9 @@ public class CamelErrorHandlerFactoryBean extends AbstractCamelFactoryBean<Error if (onRedeliveryRef != null) { handler.setOnRedelivery(lookup(onRedeliveryRef, Processor.class)); } + if (onPrepareFailureRef != null) { + handler.setOnPrepareFailure(lookup(onPrepareFailureRef, Processor.class)); + } if (retryWhileRef != null) { handler.setRetryWhileRef(retryWhileRef); } http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/components/camel-spring/src/main/java/org/apache/camel/spring/ErrorHandlerDefinition.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/ErrorHandlerDefinition.java b/components/camel-spring/src/main/java/org/apache/camel/spring/ErrorHandlerDefinition.java index 90281fe..43df20b 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/spring/ErrorHandlerDefinition.java +++ b/components/camel-spring/src/main/java/org/apache/camel/spring/ErrorHandlerDefinition.java @@ -54,6 +54,8 @@ public class ErrorHandlerDefinition extends IdentifiedType { @XmlAttribute private String onRedeliveryRef; @XmlAttribute + private String onPrepareFailureRef; + @XmlAttribute private String retryWhileRef; @XmlAttribute private String redeliveryPolicyRef; http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/components/camel-spring/src/main/java/org/apache/camel/spring/handler/ErrorHandlerDefinitionParser.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/handler/ErrorHandlerDefinitionParser.java b/components/camel-spring/src/main/java/org/apache/camel/spring/handler/ErrorHandlerDefinitionParser.java index c9f2228..5ca1974 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/spring/handler/ErrorHandlerDefinitionParser.java +++ b/components/camel-spring/src/main/java/org/apache/camel/spring/handler/ErrorHandlerDefinitionParser.java @@ -61,6 +61,7 @@ public class ErrorHandlerDefinitionParser extends BeanDefinitionParser { return !attributeName.equals("type") && !attributeName.equals("onRedeliveryRef") && !attributeName.equals("onRetryWhileRef") + && !attributeName.equals("onPrepareFailureRef") && !attributeName.equals("redeliveryPolicyRef") && !attributeName.equals("transactionTemplateRef") && !attributeName.equals("transactionManagerRef"); @@ -100,6 +101,7 @@ public class ErrorHandlerDefinitionParser extends BeanDefinitionParser { } parserRefAttribute(element, "onRedeliveryRef", "onRedelivery", builder); parserRefAttribute(element, "onRetryWhileRef", "onRetryWhile", builder); + parserRefAttribute(element, "onPrepareFailureRef", "onPrepareFailure", builder); parserRefAttribute(element, "redeliveryPolicyRef", "redeliveryPolicy", builder); if (type.equals(ErrorHandlerType.TransactionErrorHandler)) { parserRefAttribute(element, "transactionTemplateRef", "transactionTemplate", builder); @@ -144,6 +146,11 @@ public class ErrorHandlerDefinitionParser extends BeanDefinitionParser { throw new IllegalArgumentException("Attribute onRedeliveryRef is not supported by error handler type: " + type.name() + ", in error handler with id: " + id); } + String onPrepareFailureRef = element.getAttribute("onPrepareFailureRef"); + if (ObjectHelper.isNotEmpty(onPrepareFailureRef) && (type.equals(ErrorHandlerType.TransactionErrorHandler) || type.equals(ErrorHandlerType.LoggingErrorHandler) || type.equals(ErrorHandlerType.NoErrorHandler))) { + throw new IllegalArgumentException("Attribute onPrepareFailureRef is not supported by error handler type: " + + type.name() + ", in error handler with id: " + id); + } String retryWhileRef = element.getAttribute("retryWhileRef"); if (ObjectHelper.isNotEmpty(retryWhileRef) && (type.equals(ErrorHandlerType.LoggingErrorHandler) || type.equals(ErrorHandlerType.NoErrorHandler))) { throw new IllegalArgumentException("Attribute retryWhileRef is not supported by error handler type: " http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java index 27a7afe..c840d8a 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java +++ b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java @@ -67,7 +67,7 @@ public class TransactionErrorHandler extends RedeliveryErrorHandler { TransactionTemplate transactionTemplate, Predicate retryWhile, ScheduledExecutorService executorService, LoggingLevel rollbackLoggingLevel) { - super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, null, null, false, false, retryWhile, executorService); + super(camelContext, output, logger, redeliveryProcessor, redeliveryPolicy, null, null, false, false, retryWhile, executorService, null); setExceptionPolicy(exceptionPolicyStrategy); this.transactionTemplate = transactionTemplate; this.rollbackLoggingLevel = rollbackLoggingLevel; http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringDeadLetterChannelOnPrepareTestTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringDeadLetterChannelOnPrepareTestTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringDeadLetterChannelOnPrepareTestTest.java new file mode 100644 index 0000000..9a49749 --- /dev/null +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringDeadLetterChannelOnPrepareTestTest.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.processor.DeadLetterChannelOnPrepareTest; + +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; + +public class SpringDeadLetterChannelOnPrepareTestTest extends DeadLetterChannelOnPrepareTest { + + protected CamelContext createCamelContext() throws Exception { + return createSpringCamelContext(this, "org/apache/camel/spring/processor/DeadLetterChannelOnPrepareTest.xml"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringDefaultErrorHandlerOnPrepareTestTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringDefaultErrorHandlerOnPrepareTestTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringDefaultErrorHandlerOnPrepareTestTest.java new file mode 100644 index 0000000..d03e2b8 --- /dev/null +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringDefaultErrorHandlerOnPrepareTestTest.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.processor.DefaultErrorHandlerOnPrepareTest; + +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; + +public class SpringDefaultErrorHandlerOnPrepareTestTest extends DefaultErrorHandlerOnPrepareTest { + + protected CamelContext createCamelContext() throws Exception { + return createSpringCamelContext(this, "org/apache/camel/spring/processor/DefaultErrorHandlerOnPrepareTest.xml"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/DeadLetterChannelOnPrepareTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/DeadLetterChannelOnPrepareTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/DeadLetterChannelOnPrepareTest.xml new file mode 100644 index 0000000..4c1e234 --- /dev/null +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/DeadLetterChannelOnPrepareTest.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + "> + + <bean id="forced" class="java.lang.IllegalArgumentException"> + <constructor-arg index="0" value="Forced"/> + </bean> + + <bean id="myPrepare" + class="org.apache.camel.processor.DeadLetterChannelOnPrepareTest.MyPrepareProcessor"/> + + <camelContext xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="dlc"> + + <errorHandler id="dlc" type="DeadLetterChannel" deadLetterUri="mock:dead" onPrepareFailureRef="myPrepare"/> + + <route> + <from uri="direct:start"/> + <log message="Incoming ${body}"/> + <throwException ref="forced"/> + </route> + </camelContext> + +</beans> http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/DefaultErrorHandlerOnPrepareTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/DefaultErrorHandlerOnPrepareTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/DefaultErrorHandlerOnPrepareTest.xml new file mode 100644 index 0000000..2b9b814 --- /dev/null +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/DefaultErrorHandlerOnPrepareTest.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + "> + + <bean id="forced" class="java.lang.IllegalArgumentException"> + <constructor-arg index="0" value="Forced"/> + </bean> + + <bean id="myPrepare" + class="org.apache.camel.processor.DefaultErrorHandlerOnPrepareTest.MyPrepareProcessor"/> + + <camelContext xmlns="http://camel.apache.org/schema/spring" errorHandlerRef="defaultEH"> + + <errorHandler id="defaultEH" type="DefaultErrorHandler" onPrepareFailureRef="myPrepare"/> + + <route> + <from uri="direct:start"/> + <log message="Incoming ${body}"/> + <throwException ref="forced"/> + </route> + </camelContext> + +</beans> http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/components/camel-test-blueprint/src/test/java/org/apache/camel/test/blueprint/DeadLetterChannelOnPrepareTestTest.java ---------------------------------------------------------------------- diff --git a/components/camel-test-blueprint/src/test/java/org/apache/camel/test/blueprint/DeadLetterChannelOnPrepareTestTest.java b/components/camel-test-blueprint/src/test/java/org/apache/camel/test/blueprint/DeadLetterChannelOnPrepareTestTest.java new file mode 100644 index 0000000..669f12b --- /dev/null +++ b/components/camel-test-blueprint/src/test/java/org/apache/camel/test/blueprint/DeadLetterChannelOnPrepareTestTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.test.blueprint; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.junit.Test; + +public class DeadLetterChannelOnPrepareTestTest extends CamelBlueprintTestSupport { + + @Override + protected String getBlueprintDescriptor() { + return "org/apache/camel/test/blueprint/DeadLetterChannelOnPrepareTestTest.xml"; + } + + @Test + public void testDeadLetterChannelOnPrepare() throws Exception { + getMockEndpoint("mock:dead").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:dead").expectedHeaderReceived("FailedBecause", "Forced"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public static class MyPrepareProcessor implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); + exchange.getIn().setHeader("FailedBecause", cause.getMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/components/camel-test-blueprint/src/test/java/org/apache/camel/test/blueprint/DefaultErrorHandlerOnPrepareTestTest.java ---------------------------------------------------------------------- diff --git a/components/camel-test-blueprint/src/test/java/org/apache/camel/test/blueprint/DefaultErrorHandlerOnPrepareTestTest.java b/components/camel-test-blueprint/src/test/java/org/apache/camel/test/blueprint/DefaultErrorHandlerOnPrepareTestTest.java new file mode 100644 index 0000000..6caf7d6 --- /dev/null +++ b/components/camel-test-blueprint/src/test/java/org/apache/camel/test/blueprint/DefaultErrorHandlerOnPrepareTestTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.test.blueprint; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.junit.Test; + +public class DefaultErrorHandlerOnPrepareTestTest extends CamelBlueprintTestSupport { + + @Override + protected String getBlueprintDescriptor() { + return "org/apache/camel/test/blueprint/DefaultErrorHandlerOnPrepareTestTest.xml"; + } + + @Test + public void testDefaultErrorHandlerOnPrepare() throws Exception { + Exchange out = template.request("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello World"); + } + }); + assertNotNull(out); + assertTrue("Should be failed", out.isFailed()); + assertIsInstanceOf(IllegalArgumentException.class, out.getException()); + assertEquals("Forced", out.getIn().getHeader("FailedBecause")); + } + + public static class MyPrepareProcessor implements Processor { + + @Override + public void process(Exchange exchange) throws Exception { + Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); + exchange.getIn().setHeader("FailedBecause", cause.getMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/components/camel-test-blueprint/src/test/resources/org/apache/camel/test/blueprint/DeadLetterChannelOnPrepareTestTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-test-blueprint/src/test/resources/org/apache/camel/test/blueprint/DeadLetterChannelOnPrepareTestTest.xml b/components/camel-test-blueprint/src/test/resources/org/apache/camel/test/blueprint/DeadLetterChannelOnPrepareTestTest.xml new file mode 100644 index 0000000..c404b8d --- /dev/null +++ b/components/camel-test-blueprint/src/test/resources/org/apache/camel/test/blueprint/DeadLetterChannelOnPrepareTestTest.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd"> + + <bean id="forced" class="java.lang.IllegalArgumentException"> + <argument index="0" value="Forced"/> + </bean> + + <bean id="myPrepare" + class="org.apache.camel.test.blueprint.DeadLetterChannelOnPrepareTestTest.MyPrepareProcessor"/> + + <camelContext xmlns="http://camel.apache.org/schema/blueprint" errorHandlerRef="dlc"> + + <errorHandler id="dlc" type="DeadLetterChannel" deadLetterUri="mock:dead" onPrepareFailureRef="myPrepare"/> + + <route> + <from uri="direct:start"/> + <log message="Incoming ${body}"/> + <throwException ref="forced"/> + </route> + </camelContext> + +</blueprint> + http://git-wip-us.apache.org/repos/asf/camel/blob/8c47e24b/components/camel-test-blueprint/src/test/resources/org/apache/camel/test/blueprint/DefaultErrorHandlerOnPrepareTestTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-test-blueprint/src/test/resources/org/apache/camel/test/blueprint/DefaultErrorHandlerOnPrepareTestTest.xml b/components/camel-test-blueprint/src/test/resources/org/apache/camel/test/blueprint/DefaultErrorHandlerOnPrepareTestTest.xml new file mode 100644 index 0000000..637abb9 --- /dev/null +++ b/components/camel-test-blueprint/src/test/resources/org/apache/camel/test/blueprint/DefaultErrorHandlerOnPrepareTestTest.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd"> + + <bean id="forced" class="java.lang.IllegalArgumentException"> + <argument index="0" value="Forced"/> + </bean> + + <bean id="myPrepare" + class="org.apache.camel.test.blueprint.DefaultErrorHandlerOnPrepareTestTest.MyPrepareProcessor"/> + + <camelContext xmlns="http://camel.apache.org/schema/blueprint" errorHandlerRef="defaultEH"> + + <errorHandler id="defaultEH" type="DefaultErrorHandler" onPrepareFailureRef="myPrepare"/> + + <route> + <from uri="direct:start"/> + <log message="Incoming ${body}"/> + <throwException ref="forced"/> + </route> + </camelContext> + +</blueprint> +