CAMEL-6845: Using recipient list to a route that has no error handler should allow caller route to trigger its error handler
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7f9512fa Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7f9512fa Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7f9512fa Branch: refs/heads/camel-2.12.x Commit: 7f9512faadaa9d9d218db14361d5fea77c1e0c4d Parents: ade5eb0 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Oct 10 08:04:37 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Oct 10 10:28:01 2013 +0200 ---------------------------------------------------------------------- .../camel/builder/NoErrorHandlerBuilder.java | 25 +++++++- .../camel/processor/MulticastProcessor.java | 35 ++++++++---- .../RecipientListNoErrorHandlerTest.java | 60 ++++++++++++++++++++ 3 files changed, 108 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/7f9512fa/camel-core/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java b/camel-core/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java index d9e650a..1c3ad0e 100644 --- a/camel-core/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java +++ b/camel-core/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java @@ -16,7 +16,10 @@ */ package org.apache.camel.builder; +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.processor.DelegateAsyncProcessor; import org.apache.camel.spi.RouteContext; /** @@ -31,7 +34,27 @@ import org.apache.camel.spi.RouteContext; public class NoErrorHandlerBuilder extends ErrorHandlerBuilderSupport { public Processor createErrorHandler(RouteContext routeContext, Processor processor) { - return processor; + return new DelegateAsyncProcessor(processor) { + @Override + public boolean process(final Exchange exchange, final AsyncCallback callback) { + return super.process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); + callback.done(doneSync); + } + }); + } + + @Override + public String toString() { + if (processor == null) { + // if no output then dont do any description + return ""; + } + return "NoErrorHandler[" + processor + "]"; + } + }; } public boolean supportTransacted() { http://git-wip-us.apache.org/repos/asf/camel/blob/7f9512fa/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index a3bca76..7584adf 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -733,15 +733,15 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor * when using the asynchronous routing engine. And therefore we want the logic in one method instead * of being scattered. * - * @param original the original exchange - * @param subExchange the current sub exchange, can be <tt>null</tt> for the synchronous part - * @param pairs the pairs with the exchanges to process - * @param callback the callback - * @param doneSync the <tt>doneSync</tt> parameter to call on callback - * @param exhaust whether or not error handling is exhausted + * @param original the original exchange + * @param subExchange the current sub exchange, can be <tt>null</tt> for the synchronous part + * @param pairs the pairs with the exchanges to process + * @param callback the callback + * @param doneSync the <tt>doneSync</tt> parameter to call on callback + * @param forceExhaust whether or not error handling is exhausted */ protected void doDone(Exchange original, Exchange subExchange, final Iterable<ProcessorExchangePair> pairs, - AsyncCallback callback, boolean doneSync, boolean exhaust) { + AsyncCallback callback, boolean doneSync, boolean forceExhaust) { // we are done so close the pairs iterator if (pairs != null && pairs instanceof Closeable) { @@ -751,16 +751,18 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor // cleanup any per exchange aggregation strategy removeAggregationStrategyFromExchange(original); + // we need to know if there was an exception, and if the stopOnException option was enabled + // also we would need to know if any error handler has attempted redelivery and exhausted boolean stoppedOnException = false; + boolean exception = false; + boolean exhaust = forceExhaust || subExchange != null && (subExchange.getException() != null || ExchangeHelper.isRedeliveryExhausted(subExchange)); if (original.getException() != null || subExchange != null && subExchange.getException() != null) { // there was an exception and we stopped stoppedOnException = isStopOnException(); - // multicast uses error handling on its output processors and they have tried to redeliver - // so we shall signal back to the other error handlers that we are exhausted and they should not - // also try to redeliver as we will then do that twice - original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust); + exception = true; } + // must copy results at this point if (subExchange != null) { if (stoppedOnException) { // if we stopped due an exception then only propagte the exception @@ -770,6 +772,17 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor ExchangeHelper.copyResults(original, subExchange); } } + + // .. and then if there was an exception we need to configure the redelivery exhaust + // for example the noErrorHandler will not cause redelivery exhaust so if this error + // handled has been in use, then the exhaust would be false (if not forced) + if (exception) { + // multicast uses error handling on its output processors and they have tried to redeliver + // so we shall signal back to the other error handlers that we are exhausted and they should not + // also try to redeliver as we will then do that twice + original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust); + } + callback.done(doneSync); } http://git-wip-us.apache.org/repos/asf/camel/blob/7f9512fa/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoErrorHandlerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoErrorHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoErrorHandlerTest.java new file mode 100644 index 0000000..f037e52 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoErrorHandlerTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +public class RecipientListNoErrorHandlerTest extends ContextTestSupport { + + public void testRecipientListNoErrorHandler() throws Exception { + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:result").expectedMessageCount(0); + getMockEndpoint("mock:dead").expectedMessageCount(1); + getMockEndpoint("mock:dead").message(0).property(Exchange.EXCEPTION_CAUGHT).isInstanceOf(IllegalArgumentException.class); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:start") + .recipientList().constant("direct:foo") + .to("mock:result"); + + from("direct:foo") + .errorHandler(noErrorHandler()) + .to("mock:foo") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new IllegalArgumentException("Forced"); + } + }); + } + }; + } +}