Repository: camel Updated Branches: refs/heads/master 7ee49fe05 -> 3a227f2c4
CAMEL-10272: Provide an option to stop further processing when an exception is thrown from an aggregation strategy while parallelProcessing is used. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/43036a4a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/43036a4a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/43036a4a Branch: refs/heads/master Commit: 43036a4a574873feaa421abbb5393f865714d790 Parents: 7ee49fe Author: aldettinger <aldettin...@gmail.com> Authored: Tue Dec 27 10:55:18 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jan 1 15:21:10 2017 +0100 ---------------------------------------------------------------------- .../java/org/apache/camel/RecipientList.java | 1 + .../apache/camel/component/bean/MethodInfo.java | 1 + .../apache/camel/model/MulticastDefinition.java | 27 +++++++- .../camel/model/RecipientListDefinition.java | 26 ++++++++ .../org/apache/camel/model/SplitDefinition.java | 27 +++++++- .../camel/processor/MulticastProcessor.java | 27 ++++++-- .../apache/camel/processor/RecipientList.java | 12 +++- .../camel/processor/RecipientListProcessor.java | 11 +++- .../org/apache/camel/processor/Splitter.java | 18 ++++-- .../aggregate/AggregationStrategy.java | 2 +- ...ggregationStrategyThrowingExceptionTest.java | 68 ++++++++++++++++++++ ...ggregationStrategyThrowingExceptionTest.java | 66 +++++++++++++++++++ ...ggregationStrategyThrowingExceptionTest.java | 66 +++++++++++++++++++ .../camel/management/ManagedMulticastTest.java | 2 +- .../management/ManagedRecipientListTest.java | 2 +- .../camel/management/ManagedSplitterTest.java | 2 +- 16 files changed, 338 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/RecipientList.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/RecipientList.java b/camel-core/src/main/java/org/apache/camel/RecipientList.java index 7cd9cda..bd5e996 100644 --- a/camel-core/src/main/java/org/apache/camel/RecipientList.java +++ b/camel-core/src/main/java/org/apache/camel/RecipientList.java @@ -48,6 +48,7 @@ public @interface RecipientList { boolean parallelProcessing() default false; boolean parallelAggregate() default false; boolean stopOnException() default false; + boolean stopOnAggregateException() default false; boolean streaming() default false; boolean ignoreInvalidEndpoints() default false; String strategyRef() default ""; http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java index 5ee4b46..3e5a314 100644 --- a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java +++ b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java @@ -163,6 +163,7 @@ public class MethodInfo { && matchContext(recipientListAnnotation.context())) { recipientList = new RecipientList(camelContext, recipientListAnnotation.delimiter()); recipientList.setStopOnException(recipientListAnnotation.stopOnException()); + recipientList.setStopOnAggregateException(recipientListAnnotation.stopOnAggregateException()); recipientList.setIgnoreInvalidEndpoints(recipientListAnnotation.ignoreInvalidEndpoints()); recipientList.setParallelProcessing(recipientListAnnotation.parallelProcessing()); recipientList.setParallelAggregate(recipientListAnnotation.parallelAggregate()); http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java index bc8e76c..7bff217 100644 --- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java @@ -73,6 +73,8 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i private Boolean shareUnitOfWork; @XmlAttribute private Boolean parallelAggregate; + @XmlAttribute + private Boolean stopOnAggregateException; public MulticastDefinition() { } @@ -183,6 +185,20 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i setParallelAggregate(true); return this; } + + /** + * If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used. + * Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used. + * Enabling this option allows to work around this behavior. + * + * The default value is <code>false</code> for the sake of backward compatibility. + * + * @return the builder + */ + public MulticastDefinition stopOnAggregateException() { + setStopOnAggregateException(true); + return this; + } /** * If enabled then Camel will process replies out-of-order, eg in the order they come back. @@ -294,6 +310,7 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i boolean isStreaming = getStreaming() != null && getStreaming(); boolean isStopOnException = getStopOnException() != null && getStopOnException(); boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate(); + boolean isStopOnAggregateException = getStopOnAggregateException() != null && getStopOnAggregateException(); boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing); ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing); @@ -307,7 +324,7 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i } MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, strategy, isParallelProcessing, - threadPool, shutdownThreadPool, isStreaming, isStopOnException, timeout, onPrepare, isShareUnitOfWork, isParallelAggregate); + threadPool, shutdownThreadPool, isStreaming, isStopOnException, timeout, onPrepare, isShareUnitOfWork, isParallelAggregate, isStopOnAggregateException); return answer; } @@ -474,4 +491,12 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i this.parallelAggregate = parallelAggregate; } + public Boolean getStopOnAggregateException() { + return stopOnAggregateException; + } + + public void setStopOnAggregateException(Boolean stopOnAggregateException) { + this.stopOnAggregateException = stopOnAggregateException; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java index 0d02a48..b7b3b85 100644 --- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java @@ -83,6 +83,8 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext private Integer cacheSize; @XmlAttribute private Boolean parallelAggregate; + @XmlAttribute + private Boolean stopOnAggregateException; public RecipientListDefinition() { } @@ -115,6 +117,7 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); boolean isStopOnException = getStopOnException() != null && getStopOnException(); boolean isIgnoreInvalidEndpoints = getIgnoreInvalidEndpoints() != null && getIgnoreInvalidEndpoints(); + boolean isStopOnAggregateException = getStopOnAggregateException() != null && getStopOnAggregateException(); RecipientList answer; if (delimiter != null) { @@ -129,6 +132,7 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext answer.setShareUnitOfWork(isShareUnitOfWork); answer.setStopOnException(isStopOnException); answer.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints); + answer.setStopOnAggregateException(isStopOnAggregateException); if (getCacheSize() != null) { answer.setCacheSize(getCacheSize()); } @@ -322,6 +326,20 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext } /** + * If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used. + * Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used. + * Enabling this option allows to work around this behavior. + * + * The default value is <code>false</code> for the sake of backward compatibility. + * + * @return the builder + */ + public RecipientListDefinition<Type> stopOnAggregateException() { + setStopOnAggregateException(true); + return this; + } + + /** * If enabled then Camel will process replies out-of-order, eg in the order they come back. * If disabled, Camel will process replies in the same order as defined by the recipient list. * @@ -599,4 +617,12 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext public void setParallelAggregate(Boolean parallelAggregate) { this.parallelAggregate = parallelAggregate; } + + public Boolean getStopOnAggregateException() { + return stopOnAggregateException; + } + + public void setStopOnAggregateException(Boolean stopOnAggregateException) { + this.stopOnAggregateException = stopOnAggregateException; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java index f98780f..e7305e8 100644 --- a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java @@ -72,6 +72,8 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw private Boolean shareUnitOfWork; @XmlAttribute private Boolean parallelAggregate; + @XmlAttribute + private Boolean stopOnAggregateException; public SplitDefinition() { } @@ -103,6 +105,7 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw boolean isStreaming = getStreaming() != null && getStreaming(); boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate(); + boolean isStopOnAggregateException = getStopOnAggregateException() != null && getStopOnAggregateException(); boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing); ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing); @@ -118,7 +121,7 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy, isParallelProcessing, threadPool, shutdownThreadPool, isStreaming, isStopOnException(), - timeout, onPrepare, isShareUnitOfWork, isParallelAggregate); + timeout, onPrepare, isShareUnitOfWork, isParallelAggregate, isStopOnAggregateException); return answer; } @@ -231,6 +234,20 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw setParallelAggregate(true); return this; } + + /** + * If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used. + * Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used. + * Enabling this option allows to work around this behavior. + * + * The default value is <code>false</code> for the sake of backward compatibility. + * + * @return the builder + */ + public SplitDefinition stopOnAggregateException() { + setStopOnAggregateException(true); + return this; + } /** * When in streaming mode, then the splitter splits the original message on-demand, and each splitted @@ -390,6 +407,14 @@ public class SplitDefinition extends ExpressionNode implements ExecutorServiceAw public void setParallelAggregate(Boolean parallelAggregate) { this.parallelAggregate = parallelAggregate; } + + public Boolean getStopOnAggregateException() { + return this.stopOnAggregateException; + } + + public void setStopOnAggregateException(Boolean stopOnAggregateException) { + this.stopOnAggregateException = stopOnAggregateException; + } public Boolean getStopOnException() { return stopOnException; http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index e0cd13d..b0def97 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -152,6 +152,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor private final boolean parallelProcessing; private final boolean streaming; private final boolean parallelAggregate; + private final boolean stopOnAggregateException; private final boolean stopOnException; private final ExecutorService executorService; private final boolean shutdownExecutorService; @@ -176,10 +177,17 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false); } + public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, + ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, + boolean shareUnitOfWork, boolean parallelAggregate) { + this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, + shareUnitOfWork, false, false); + } + public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, - boolean parallelAggregate) { + boolean parallelAggregate, boolean stopOnAggregateException) { notNull(camelContext, "camelContext"); this.camelContext = camelContext; this.processors = processors; @@ -194,6 +202,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor this.onPrepare = onPrepare; this.shareUnitOfWork = shareUnitOfWork; this.parallelAggregate = parallelAggregate; + this.stopOnAggregateException = stopOnAggregateException; } @Override @@ -530,10 +539,14 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor doAggregate(getAggregationStrategy(subExchange), result, subExchange); } } catch (Throwable e) { - // wrap in exception to explain where it failed - CamelExchangeException cex = new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e); - subExchange.setException(cex); - LOG.debug(cex.getMessage(), cex); + if (isStopOnAggregateException()) { + throw e; + } else { + // wrap in exception to explain where it failed + CamelExchangeException cex = new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e); + subExchange.setException(cex); + LOG.debug(cex.getMessage(), cex); + } } finally { aggregated.incrementAndGet(); } @@ -1294,6 +1307,10 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor return parallelAggregate; } + public boolean isStopOnAggregateException() { + return stopOnAggregateException; + } + public boolean isShareUnitOfWork() { return shareUnitOfWork; } http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java index ded8ca9..7534e87 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java @@ -61,6 +61,7 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor, IdA private final String delimiter; private boolean parallelProcessing; private boolean parallelAggregate; + private boolean stopOnAggregateException; private boolean stopOnException; private boolean ignoreInvalidEndpoints; private boolean streaming; @@ -145,7 +146,8 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor, IdA RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(), isParallelProcessing(), getExecutorService(), isShutdownExecutorService(), - isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate()) { + isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(), + isStopOnAggregateException()) { @Override protected synchronized ExecutorService createAggregateExecutorService(String name) { // use a shared executor service to avoid creating new thread pools @@ -250,6 +252,14 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor, IdA this.parallelAggregate = parallelAggregate; } + public boolean isStopOnAggregateException() { + return stopOnAggregateException; + } + + public void setStopOnAggregateException(boolean stopOnAggregateException) { + this.stopOnAggregateException = stopOnAggregateException; + } + public boolean isStopOnException() { return stopOnException; } http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java index db6af86..33ef611 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java @@ -163,8 +163,15 @@ public class RecipientListProcessor extends MulticastProcessor { public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate) { - super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, - streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, parallelAggregate); + this(camelContext, producerCache, iter, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, + shareUnitOfWork, parallelAggregate, false); + } + + public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy, + boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, + long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate, boolean stopOnAggregateException) { + super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, + shareUnitOfWork, parallelAggregate, stopOnAggregateException); this.producerCache = producerCache; this.iter = iter; } http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java index ba3be2e..8a06f79 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java @@ -69,12 +69,18 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, false); } - public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, - boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, - boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork, - boolean parallelAggregate) { - super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, - shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, parallelAggregate); + public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing, + ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, + boolean useSubUnitOfWork, boolean parallelAggregate) { + this(camelContext, expression, destination, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, + onPrepare, useSubUnitOfWork, false, false); + } + + public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing, + ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, + boolean useSubUnitOfWork, boolean parallelAggregate, boolean stopOnAggregateException) { + super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, + timeout, onPrepare, useSubUnitOfWork, parallelAggregate, stopOnAggregateException); this.expression = expression; notNull(expression, "expression"); notNull(destination, "destination"); http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java index 802e1b8..c593fa4 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java @@ -29,7 +29,7 @@ import org.apache.camel.Exchange; * could be to sum up a total amount etc. * <p/> * Note that <tt>oldExchange</tt> may be <tt>null</tt> more than once when this strategy is throwing a {@link java.lang.RuntimeException} - * and <tt>parallelProcessing</tt> is used. + * and <tt>parallelProcessing</tt> is used. You can work around this behavior using the <tt>stopOnAggregateException</tt> option. * <p/> * It is possible that <tt>newExchange</tt> is <tt>null</tt> which could happen if there was no data possible * to acquire. Such as when using a {@link org.apache.camel.processor.PollEnricher} to poll from a JMS queue which http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/issues/MulticastParallelWithAggregationStrategyThrowingExceptionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/issues/MulticastParallelWithAggregationStrategyThrowingExceptionTest.java b/camel-core/src/test/java/org/apache/camel/issues/MulticastParallelWithAggregationStrategyThrowingExceptionTest.java new file mode 100644 index 0000000..4b35ea8 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/issues/MulticastParallelWithAggregationStrategyThrowingExceptionTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +/** + * Tests the issue stated in + * <a href="https://issues.apache.org/jira/browse/CAMEL-10272">CAMEL-10272</a>. + */ +public class MulticastParallelWithAggregationStrategyThrowingExceptionTest extends ContextTestSupport { + + public void testAggregationTimeExceptionWithParallelProcessing() throws Exception { + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:b").expectedMessageCount(1); + getMockEndpoint("mock:end").expectedMessageCount(0); + getMockEndpoint("mock:dead").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + // must use share UoW if we want the error handler to react on + // exceptions + // from the aggregation strategy also. + from("direct:start"). + multicast(new MyAggregateBean()).parallelProcessing().stopOnAggregateException().shareUnitOfWork() + .to("mock:a") + .to("mock:b") + .end() + .to("mock:end"); + } + }; + } + + public static class MyAggregateBean implements AggregationStrategy { + + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + throw new RuntimeException("Simulating a runtime exception thrown from the aggregation strategy"); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java b/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java new file mode 100644 index 0000000..4509c23 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +/** + * Tests the issue stated in + * <a href="https://issues.apache.org/jira/browse/CAMEL-10272">CAMEL-10272</a>. + */ +public class RecipientListParallelWithAggregationStrategyThrowingExceptionTest extends ContextTestSupport { + + public void testAggregationTimeExceptionWithParallelProcessing() throws Exception { + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:b").expectedMessageCount(1); + getMockEndpoint("mock:end").expectedMessageCount(0); + getMockEndpoint("mock:dead").expectedMessageCount(1); + + template.sendBodyAndHeader("direct:start", "Hello World", "recipients", "mock:a,mock:b"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + // must use share UoW if we want the error handler to react on + // exceptions + // from the aggregation strategy also. + from("direct:start"). + recipientList(header("recipients")).aggregationStrategy(new MyAggregateBean()). + parallelProcessing().stopOnAggregateException().shareUnitOfWork() + .end() + .to("mock:end"); + } + }; + } + + public static class MyAggregateBean implements AggregationStrategy { + + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + throw new RuntimeException("Simulating a runtime exception thrown from the aggregation strategy"); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithAggregationStrategyThrowingExceptionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithAggregationStrategyThrowingExceptionTest.java b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithAggregationStrategyThrowingExceptionTest.java new file mode 100644 index 0000000..66743f4 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithAggregationStrategyThrowingExceptionTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +/** + * Tests the issue stated in + * <a href="https://issues.apache.org/jira/browse/CAMEL-10272">CAMEL-10272</a>. + */ +public class SplitterParallelWithAggregationStrategyThrowingExceptionTest extends ContextTestSupport { + + public void testAggregationTimeExceptionWithParallelProcessing() throws Exception { + getMockEndpoint("mock:a").expectedMessageCount(2); + getMockEndpoint("mock:end").expectedMessageCount(0); + getMockEndpoint("mock:dead").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello@World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + // must use share UoW if we want the error handler to react on + // exceptions + // from the aggregation strategy also. + from("direct:start"). + split(body().tokenize("@")).aggregationStrategy(new MyAggregateBean()). + parallelProcessing().stopOnAggregateException().shareUnitOfWork() + .to("mock:a") + .end() + .to("mock:end"); + } + }; + } + + public static class MyAggregateBean implements AggregationStrategy { + + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + throw new RuntimeException("Simulating a runtime exception thrown from the aggregation strategy"); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java index 8adac73..486bccf 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java @@ -67,7 +67,7 @@ public class ManagedMulticastTest extends ManagementTestSupport { data = (TabularData) mbeanServer.invoke(name, "explain", new Object[]{true}, new String[]{"boolean"}); assertNotNull(data); - assertEquals(14, data.size()); + assertEquals(15, data.size()); String json = (String) mbeanServer.invoke(name, "informationJson", null, null); assertNotNull(json); http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java index 0e041e3..a37d6ca 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java @@ -91,7 +91,7 @@ public class ManagedRecipientListTest extends ManagementTestSupport { data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"}); assertNotNull(data); - assertEquals(17, data.size()); + assertEquals(18, data.size()); String json = (String) mbeanServer.invoke(on, "informationJson", null, null); assertNotNull(json); http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java index c13b342..3276f9e 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java @@ -75,7 +75,7 @@ public class ManagedSplitterTest extends ManagementTestSupport { data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"}); assertNotNull(data); - assertEquals(15, data.size()); + assertEquals(16, data.size()); String json = (String) mbeanServer.invoke(on, "informationJson", null, null); assertNotNull(json);