CAMEL-7275: Fixed doTry .. doCatch using recipient list/multicast etc. not working as expected to let the doCatch handle the errors.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3cbc7fc6 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3cbc7fc6 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3cbc7fc6 Branch: refs/heads/camel-2.12.x Commit: 3cbc7fc693cf40823f9180c8e8d19a3d7999d81d Parents: 2da8fa9 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Mar 6 15:00:26 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Mar 6 16:26:38 2014 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/camel/Exchange.java | 1 + .../camel/processor/MulticastProcessor.java | 5 +- .../apache/camel/processor/TryProcessor.java | 4 + .../processor/TryCatchRecipientListTest.java | 254 +++++++++++++++++++ 4 files changed, 263 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3cbc7fc6/camel-core/src/main/java/org/apache/camel/Exchange.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java index 9f7a22d..6b909da 100644 --- a/camel-core/src/main/java/org/apache/camel/Exchange.java +++ b/camel-core/src/main/java/org/apache/camel/Exchange.java @@ -193,6 +193,7 @@ public interface Exchange { String TRACE_EVENT_NODE_ID = "CamelTraceEventNodeId"; String TRACE_EVENT_TIMESTAMP = "CamelTraceEventTimestamp"; String TRACE_EVENT_EXCHANGE = "CamelTraceEventExchange"; + String TRY_ROUTE_BLOCK = "TryRouteBlock"; String TRANSFER_ENCODING = "Transfer-Encoding"; String UNIT_OF_WORK_EXHAUSTED = "CamelUnitOfWorkExhausted"; http://git-wip-us.apache.org/repos/asf/camel/blob/3cbc7fc6/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 5e8d5c0..439f56c 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -886,7 +886,10 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) { Processor answer; - if (routeContext != null) { + boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class); + + // do not wrap in error handler if we are inside a try block + if (!tryBlock && routeContext != null) { // wrap the producer in error handler so we have fine grained error handling on // the output side instead of the input side // this is needed to support redelivery on that output alone and not doing redelivery http://git-wip-us.apache.org/repos/asf/camel/blob/3cbc7fc6/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java index 1bfe5dd..b53a14e 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java @@ -73,6 +73,7 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi exchange.setProperty(Exchange.EXCEPTION_HANDLED, null); while (continueRouting(processors, exchange)) { + exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true); ExchangeHelper.prepareOutToIn(exchange); // process the next processor @@ -92,6 +93,7 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi } ExchangeHelper.prepareOutToIn(exchange); + exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK); exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled); LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); callback.done(true); @@ -115,6 +117,7 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi // continue processing the try .. catch .. finally asynchronously while (continueRouting(processors, exchange)) { + exchange.setProperty(Exchange.TRY_ROUTE_BLOCK, true); ExchangeHelper.prepareOutToIn(exchange); // process the next processor @@ -130,6 +133,7 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi } ExchangeHelper.prepareOutToIn(exchange); + exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK); exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled); LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); callback.done(false); http://git-wip-us.apache.org/repos/asf/camel/blob/3cbc7fc6/camel-core/src/test/java/org/apache/camel/processor/TryCatchRecipientListTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/TryCatchRecipientListTest.java b/camel-core/src/test/java/org/apache/camel/processor/TryCatchRecipientListTest.java new file mode 100644 index 0000000..9bb927c --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/TryCatchRecipientListTest.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +public class TryCatchRecipientListTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public void testTryCatchTo() throws Exception { + context.addRoutes(createTryCatchToRouteBuilder()); + context.start(); + + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedBodiesReceived("doCatch"); + getMockEndpoint("mock:dead").expectedMessageCount(0); + getMockEndpoint("mock:catch").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:catch").message(0).property(Exchange.EXCEPTION_CAUGHT).isInstanceOf(IllegalArgumentException.class); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testTryCatchRecipientList() throws Exception { + context.addRoutes(createTryCatchRecipientListRouteBuilder()); + context.start(); + + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedBodiesReceived("doCatch"); + getMockEndpoint("mock:dead").expectedMessageCount(0); + getMockEndpoint("mock:catch").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:catch").message(0).property(Exchange.EXCEPTION_CAUGHT).isInstanceOf(IllegalArgumentException.class); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testDualTryCatchRecipientList() throws Exception { + context.addRoutes(createDualTryCatchRecipientListRouteBuilder()); + context.start(); + + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:bar").expectedBodiesReceived("doCatch"); + getMockEndpoint("mock:result").expectedBodiesReceived("doCatch"); + getMockEndpoint("mock:result2").expectedBodiesReceived("doCatch2"); + getMockEndpoint("mock:dead").expectedMessageCount(0); + getMockEndpoint("mock:catch").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:catch").message(0).property(Exchange.EXCEPTION_CAUGHT).isInstanceOf(IllegalArgumentException.class); + getMockEndpoint("mock:catch2").expectedBodiesReceived("doCatch"); + getMockEndpoint("mock:catch2").message(0).property(Exchange.EXCEPTION_CAUGHT).isInstanceOf(IllegalArgumentException.class); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testTo() throws Exception { + context.addRoutes(createToRouteBuilder()); + context.start(); + + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedMessageCount(0); + getMockEndpoint("mock:dead").expectedMessageCount(1); + getMockEndpoint("mock:dead").message(0).property(Exchange.EXCEPTION_CAUGHT).isInstanceOf(IllegalArgumentException.class); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + public void testRecipientList() throws Exception { + context.addRoutes(createRecipientListRouteBuilder()); + context.start(); + + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedMessageCount(0); + getMockEndpoint("mock:dead").expectedMessageCount(1); + getMockEndpoint("mock:dead").message(0).property(Exchange.EXCEPTION_CAUGHT).isInstanceOf(IllegalArgumentException.class); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + protected RouteBuilder createTryCatchToRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:start") + .doTry() + .to("direct:foo") + .doCatch(Exception.class) + .to("mock:catch") + .transform().constant("doCatch") + .end() + .to("mock:result"); + + from("direct:foo") + .errorHandler(noErrorHandler()) + .to("mock:foo") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new IllegalArgumentException("Forced"); + } + }); + } + }; + } + + protected RouteBuilder createTryCatchRecipientListRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:start") + .doTry() + .recipientList(constant("direct:foo")).end() + .doCatch(Exception.class) + .to("mock:catch") + .transform().constant("doCatch") + .end() + .to("mock:result"); + + from("direct:foo") + .errorHandler(noErrorHandler()) + .to("mock:foo") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new IllegalArgumentException("Forced"); + } + }); + } + }; + } + + protected RouteBuilder createDualTryCatchRecipientListRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:start") + .doTry() + .recipientList(constant("direct:foo")).end() + .doCatch(Exception.class) + .to("mock:catch") + .transform().constant("doCatch") + .end() + .to("mock:result") + .doTry() + .recipientList(constant("direct:bar")).end() + .doCatch(Exception.class) + .to("mock:catch2") + .transform().constant("doCatch2") + .end() + .to("mock:result2"); + + from("direct:foo") + .errorHandler(noErrorHandler()) + .to("mock:foo") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new IllegalArgumentException("Forced"); + } + }); + + from("direct:bar") + .errorHandler(noErrorHandler()) + .to("mock:bar") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new IllegalArgumentException("Forced Again"); + } + }); + } + }; + } + + protected RouteBuilder createToRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:start") + .to("direct:foo") + .to("mock:result"); + + from("direct:foo") + .errorHandler(noErrorHandler()) + .to("mock:foo") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new IllegalArgumentException("Forced"); + } + }); + } + }; + } + + protected RouteBuilder createRecipientListRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("direct:start") + .recipientList(constant("direct:foo")).end() + .to("mock:result"); + + from("direct:foo") + .errorHandler(noErrorHandler()) + .to("mock:foo") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new IllegalArgumentException("Forced"); + } + }); + } + }; + } +}