Repository: camel Updated Branches: refs/heads/master eeb09c827 -> e01fdb387
CAMEL-8587: Added unit test and made more docs how to do this. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e01fdb38 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e01fdb38 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e01fdb38 Branch: refs/heads/master Commit: e01fdb387e8658c84214ea242599f963f51cbc4f Parents: eeb09c8 Author: Claus Ibsen <davscl...@apache.org> Authored: Wed Sep 16 15:27:33 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Sep 16 15:27:33 2015 +0200 ---------------------------------------------------------------------- .../apache/camel/model/MulticastDefinition.java | 6 +- .../camel/processor/MulticastProcessor.java | 16 +++-- ...stAggregationStrategyThrowExceptionTest.java | 69 ++++++++++++++++++++ 3 files changed, 85 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e01fdb38/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java index 2e7e76c..ea8cb9d 100644 --- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java @@ -105,7 +105,9 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i /** * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast. - * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy + * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy. + * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception + * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option. */ public MulticastDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { setAggregationStrategy(aggregationStrategy); @@ -115,6 +117,8 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i /** * Sets a reference to the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast. * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy + * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception + * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option. */ public MulticastDefinition aggregationStrategyRef(String aggregationStrategyRef) { setStrategyRef(aggregationStrategyRef); http://git-wip-us.apache.org/repos/asf/camel/blob/e01fdb38/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 2358f91..acc35b9 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -518,12 +518,18 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor @Override public void run() { - if (parallelAggregate) { - doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); - } else { - doAggregate(getAggregationStrategy(subExchange), result, subExchange); + try { + if (parallelAggregate) { + doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); + } else { + doAggregate(getAggregationStrategy(subExchange), result, subExchange); + } + } catch (Throwable e) { + // wrap in exception to explain where it failed + subExchange.setException(new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e)); + } finally { + aggregated.incrementAndGet(); } - aggregated.incrementAndGet(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/e01fdb38/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyThrowExceptionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyThrowExceptionTest.java b/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyThrowExceptionTest.java new file mode 100644 index 0000000..d98ee76 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/issues/MulticastAggregationStrategyThrowExceptionTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +/** + * @version + */ +public class MulticastAggregationStrategyThrowExceptionTest extends ContextTestSupport { + + public void testThrowException() throws Exception { + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:b").expectedMessageCount(1); + getMockEndpoint("mock:dead").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + // must use share UoW if we want the error handler to react on exceptions + // from the aggregation strategy also. + from("direct:start").multicast(new MyAggregateBean()).shareUnitOfWork() + .to("direct:a") + .to("direct:b") + .end(); + + from("direct:a").to("mock:a"); + from("direct:b").to("mock:b"); + } + }; + } + + public static class MyAggregateBean implements AggregationStrategy { + + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange != null) { + throw new IllegalArgumentException("Forced"); + } + return newExchange; + } + } + +}