Repository: camel
Updated Branches:
  refs/heads/master 7ee49fe05 -> 3a227f2c4


CAMEL-10272: Provide an option to stop further processing when an exception is 
thrown from an aggregation strategy while parallelProcessing is used.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/43036a4a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/43036a4a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/43036a4a

Branch: refs/heads/master
Commit: 43036a4a574873feaa421abbb5393f865714d790
Parents: 7ee49fe
Author: aldettinger <aldettin...@gmail.com>
Authored: Tue Dec 27 10:55:18 2016 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sun Jan 1 15:21:10 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/RecipientList.java    |  1 +
 .../apache/camel/component/bean/MethodInfo.java |  1 +
 .../apache/camel/model/MulticastDefinition.java | 27 +++++++-
 .../camel/model/RecipientListDefinition.java    | 26 ++++++++
 .../org/apache/camel/model/SplitDefinition.java | 27 +++++++-
 .../camel/processor/MulticastProcessor.java     | 27 ++++++--
 .../apache/camel/processor/RecipientList.java   | 12 +++-
 .../camel/processor/RecipientListProcessor.java | 11 +++-
 .../org/apache/camel/processor/Splitter.java    | 18 ++++--
 .../aggregate/AggregationStrategy.java          |  2 +-
 ...ggregationStrategyThrowingExceptionTest.java | 68 ++++++++++++++++++++
 ...ggregationStrategyThrowingExceptionTest.java | 66 +++++++++++++++++++
 ...ggregationStrategyThrowingExceptionTest.java | 66 +++++++++++++++++++
 .../camel/management/ManagedMulticastTest.java  |  2 +-
 .../management/ManagedRecipientListTest.java    |  2 +-
 .../camel/management/ManagedSplitterTest.java   |  2 +-
 16 files changed, 338 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/RecipientList.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/RecipientList.java 
b/camel-core/src/main/java/org/apache/camel/RecipientList.java
index 7cd9cda..bd5e996 100644
--- a/camel-core/src/main/java/org/apache/camel/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/RecipientList.java
@@ -48,6 +48,7 @@ public @interface RecipientList {
     boolean parallelProcessing() default false;
     boolean parallelAggregate() default false;
     boolean stopOnException() default false;
+    boolean stopOnAggregateException() default false;
     boolean streaming() default false;
     boolean ignoreInvalidEndpoints() default false;
     String strategyRef() default "";

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java 
b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
index 5ee4b46..3e5a314 100644
--- a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
+++ b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
@@ -163,6 +163,7 @@ public class MethodInfo {
                 && matchContext(recipientListAnnotation.context())) {
             recipientList = new RecipientList(camelContext, 
recipientListAnnotation.delimiter());
             
recipientList.setStopOnException(recipientListAnnotation.stopOnException());
+            
recipientList.setStopOnAggregateException(recipientListAnnotation.stopOnAggregateException());
             
recipientList.setIgnoreInvalidEndpoints(recipientListAnnotation.ignoreInvalidEndpoints());
             
recipientList.setParallelProcessing(recipientListAnnotation.parallelProcessing());
             
recipientList.setParallelAggregate(recipientListAnnotation.parallelAggregate());

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index bc8e76c..7bff217 100644
--- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -73,6 +73,8 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition> i
     private Boolean shareUnitOfWork;
     @XmlAttribute
     private Boolean parallelAggregate;
+    @XmlAttribute
+    private Boolean stopOnAggregateException;
 
     public MulticastDefinition() {
     }
@@ -183,6 +185,20 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition> i
         setParallelAggregate(true);
         return this;
     }
+    
+    /**
+     * If enabled, unwind exceptions occurring at aggregation time to the 
error handler when parallelProcessing is used.
+     * Currently, aggregation time exceptions do not stop the route processing 
when parallelProcessing is used.
+     * Enabling this option allows to work around this behavior.
+     *
+     * The default value is <code>false</code> for the sake of backward 
compatibility.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition stopOnAggregateException() {
+        setStopOnAggregateException(true);
+        return this;
+    }
 
     /**
      * If enabled then Camel will process replies out-of-order, eg in the 
order they come back.
@@ -294,6 +310,7 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition> i
         boolean isStreaming = getStreaming() != null && getStreaming();
         boolean isStopOnException = getStopOnException() != null && 
getStopOnException();
         boolean isParallelAggregate = getParallelAggregate() != null && 
getParallelAggregate();
+        boolean isStopOnAggregateException = getStopOnAggregateException() != 
null && getStopOnAggregateException();
 
         boolean shutdownThreadPool = 
ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, 
isParallelProcessing);
         ExecutorService threadPool = 
ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, 
"Multicast", this, isParallelProcessing);
@@ -307,7 +324,7 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition> i
         }
 
         MulticastProcessor answer = new 
MulticastProcessor(routeContext.getCamelContext(), list, strategy, 
isParallelProcessing,
-                                      threadPool, shutdownThreadPool, 
isStreaming, isStopOnException, timeout, onPrepare, isShareUnitOfWork, 
isParallelAggregate);
+                                      threadPool, shutdownThreadPool, 
isStreaming, isStopOnException, timeout, onPrepare, isShareUnitOfWork, 
isParallelAggregate, isStopOnAggregateException);
         return answer;
     }
 
@@ -474,4 +491,12 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition> i
         this.parallelAggregate = parallelAggregate;
     }
 
+    public Boolean getStopOnAggregateException() {
+        return stopOnAggregateException;
+    }
+
+    public void setStopOnAggregateException(Boolean stopOnAggregateException) {
+        this.stopOnAggregateException = stopOnAggregateException;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index 0d02a48..b7b3b85 100644
--- 
a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ 
b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -83,6 +83,8 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
     private Integer cacheSize;
     @XmlAttribute
     private Boolean parallelAggregate;
+    @XmlAttribute
+    private Boolean stopOnAggregateException;
 
     public RecipientListDefinition() {
     }
@@ -115,6 +117,7 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
         boolean isShareUnitOfWork = getShareUnitOfWork() != null && 
getShareUnitOfWork();
         boolean isStopOnException = getStopOnException() != null && 
getStopOnException();
         boolean isIgnoreInvalidEndpoints = getIgnoreInvalidEndpoints() != null 
&& getIgnoreInvalidEndpoints();
+        boolean isStopOnAggregateException = getStopOnAggregateException() != 
null && getStopOnAggregateException();
 
         RecipientList answer;
         if (delimiter != null) {
@@ -129,6 +132,7 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
         answer.setShareUnitOfWork(isShareUnitOfWork);
         answer.setStopOnException(isStopOnException);
         answer.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints);
+        answer.setStopOnAggregateException(isStopOnAggregateException);
         if (getCacheSize() != null) {
             answer.setCacheSize(getCacheSize());
         }
@@ -322,6 +326,20 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
     }
 
     /**
+     * If enabled, unwind exceptions occurring at aggregation time to the 
error handler when parallelProcessing is used.
+     * Currently, aggregation time exceptions do not stop the route processing 
when parallelProcessing is used.
+     * Enabling this option allows to work around this behavior.
+     *
+     * The default value is <code>false</code> for the sake of backward 
compatibility.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> stopOnAggregateException() {
+        setStopOnAggregateException(true);
+        return this;
+    }
+
+    /**
      * If enabled then Camel will process replies out-of-order, eg in the 
order they come back.
      * If disabled, Camel will process replies in the same order as defined by 
the recipient list.
      *
@@ -599,4 +617,12 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
     public void setParallelAggregate(Boolean parallelAggregate) {
         this.parallelAggregate = parallelAggregate;
     }
+
+    public Boolean getStopOnAggregateException() {
+        return stopOnAggregateException;
+    }
+
+    public void setStopOnAggregateException(Boolean stopOnAggregateException) {
+        this.stopOnAggregateException = stopOnAggregateException;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
index f98780f..e7305e8 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
@@ -72,6 +72,8 @@ public class SplitDefinition extends ExpressionNode 
implements ExecutorServiceAw
     private Boolean shareUnitOfWork;
     @XmlAttribute
     private Boolean parallelAggregate;
+    @XmlAttribute
+    private Boolean stopOnAggregateException;
 
     public SplitDefinition() {
     }
@@ -103,6 +105,7 @@ public class SplitDefinition extends ExpressionNode 
implements ExecutorServiceAw
         boolean isStreaming = getStreaming() != null && getStreaming();
         boolean isShareUnitOfWork = getShareUnitOfWork() != null && 
getShareUnitOfWork();
         boolean isParallelAggregate = getParallelAggregate() != null && 
getParallelAggregate();
+        boolean isStopOnAggregateException = getStopOnAggregateException() != 
null && getStopOnAggregateException();
         boolean shutdownThreadPool = 
ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, 
isParallelProcessing);
         ExecutorService threadPool = 
ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", 
this, isParallelProcessing);
 
@@ -118,7 +121,7 @@ public class SplitDefinition extends ExpressionNode 
implements ExecutorServiceAw
 
         Splitter answer = new Splitter(routeContext.getCamelContext(), exp, 
childProcessor, aggregationStrategy,
                             isParallelProcessing, threadPool, 
shutdownThreadPool, isStreaming, isStopOnException(),
-                            timeout, onPrepare, isShareUnitOfWork, 
isParallelAggregate);
+                            timeout, onPrepare, isShareUnitOfWork, 
isParallelAggregate, isStopOnAggregateException);
         return answer;
     }
 
@@ -231,6 +234,20 @@ public class SplitDefinition extends ExpressionNode 
implements ExecutorServiceAw
         setParallelAggregate(true);
         return this;
     }
+    
+    /**
+     * If enabled, unwind exceptions occurring at aggregation time to the 
error handler when parallelProcessing is used.
+     * Currently, aggregation time exceptions do not stop the route processing 
when parallelProcessing is used.
+     * Enabling this option allows to work around this behavior.
+     *
+     * The default value is <code>false</code> for the sake of backward 
compatibility.
+     *
+     * @return the builder
+     */
+    public SplitDefinition stopOnAggregateException() {
+        setStopOnAggregateException(true);
+        return this;
+    }
 
     /**
      * When in streaming mode, then the splitter splits the original message 
on-demand, and each splitted
@@ -390,6 +407,14 @@ public class SplitDefinition extends ExpressionNode 
implements ExecutorServiceAw
     public void setParallelAggregate(Boolean parallelAggregate) {
         this.parallelAggregate = parallelAggregate;
     }
+    
+    public Boolean getStopOnAggregateException() {
+        return this.stopOnAggregateException;
+    }
+
+    public void setStopOnAggregateException(Boolean stopOnAggregateException) {
+        this.stopOnAggregateException = stopOnAggregateException;
+    }
 
     public Boolean getStopOnException() {
         return stopOnException;

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index e0cd13d..b0def97 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -152,6 +152,7 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
     private final boolean parallelProcessing;
     private final boolean streaming;
     private final boolean parallelAggregate;
+    private final boolean stopOnAggregateException;
     private final boolean stopOnException;
     private final ExecutorService executorService;
     private final boolean shutdownExecutorService;
@@ -176,10 +177,17 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
                 streaming, stopOnException, timeout, onPrepare, 
shareUnitOfWork, false);
     }
 
+    public MulticastProcessor(CamelContext camelContext, Collection<Processor> 
processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing,
+                              ExecutorService executorService, boolean 
shutdownExecutorService, boolean streaming, boolean stopOnException, long 
timeout, Processor onPrepare,
+                              boolean shareUnitOfWork, boolean 
parallelAggregate) {
+        this(camelContext, processors, aggregationStrategy, 
parallelProcessing, executorService, shutdownExecutorService, streaming, 
stopOnException, timeout, onPrepare,
+             shareUnitOfWork, false, false);
+    }
+    
     public MulticastProcessor(CamelContext camelContext, Collection<Processor> 
processors, AggregationStrategy aggregationStrategy,
                               boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService, boolean streaming,
                               boolean stopOnException, long timeout, Processor 
onPrepare, boolean shareUnitOfWork,
-                              boolean parallelAggregate) {
+                              boolean parallelAggregate, boolean 
stopOnAggregateException) {
         notNull(camelContext, "camelContext");
         this.camelContext = camelContext;
         this.processors = processors;
@@ -194,6 +202,7 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
         this.onPrepare = onPrepare;
         this.shareUnitOfWork = shareUnitOfWork;
         this.parallelAggregate = parallelAggregate;
+        this.stopOnAggregateException = stopOnAggregateException;
     }
 
     @Override
@@ -530,10 +539,14 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
                     doAggregate(getAggregationStrategy(subExchange), result, 
subExchange);
                 }
             } catch (Throwable e) {
-                // wrap in exception to explain where it failed
-                CamelExchangeException cex = new 
CamelExchangeException("Parallel processing failed for number " + 
aggregated.get(), subExchange, e);
-                subExchange.setException(cex);
-                LOG.debug(cex.getMessage(), cex);
+                if (isStopOnAggregateException()) {
+                    throw e;
+                } else {
+                    // wrap in exception to explain where it failed
+                    CamelExchangeException cex = new 
CamelExchangeException("Parallel processing failed for number " + 
aggregated.get(), subExchange, e);
+                    subExchange.setException(cex);
+                    LOG.debug(cex.getMessage(), cex);
+                }
             } finally {
                 aggregated.incrementAndGet();
             }
@@ -1294,6 +1307,10 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
         return parallelAggregate;
     }
 
+    public boolean isStopOnAggregateException() {
+        return stopOnAggregateException;
+    }
+
     public boolean isShareUnitOfWork() {
         return shareUnitOfWork;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index ded8ca9..7534e87 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -61,6 +61,7 @@ public class RecipientList extends ServiceSupport implements 
AsyncProcessor, IdA
     private final String delimiter;
     private boolean parallelProcessing;
     private boolean parallelAggregate;
+    private boolean stopOnAggregateException;
     private boolean stopOnException;
     private boolean ignoreInvalidEndpoints;
     private boolean streaming;
@@ -145,7 +146,8 @@ public class RecipientList extends ServiceSupport 
implements AsyncProcessor, IdA
 
         RecipientListProcessor rlp = new 
RecipientListProcessor(exchange.getContext(), producerCache, iter, 
getAggregationStrategy(),
                 isParallelProcessing(), getExecutorService(), 
isShutdownExecutorService(),
-                isStreaming(), isStopOnException(), getTimeout(), 
getOnPrepare(), isShareUnitOfWork(), isParallelAggregate()) {
+                isStreaming(), isStopOnException(), getTimeout(), 
getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
+                isStopOnAggregateException()) {
             @Override
             protected synchronized ExecutorService 
createAggregateExecutorService(String name) {
                 // use a shared executor service to avoid creating new thread 
pools
@@ -250,6 +252,14 @@ public class RecipientList extends ServiceSupport 
implements AsyncProcessor, IdA
         this.parallelAggregate = parallelAggregate;
     }
 
+    public boolean isStopOnAggregateException() {
+        return stopOnAggregateException;
+    }
+
+    public void setStopOnAggregateException(boolean stopOnAggregateException) {
+        this.stopOnAggregateException = stopOnAggregateException;
+    }
+
     public boolean isStopOnException() {
         return stopOnException;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index db6af86..33ef611 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -163,8 +163,15 @@ public class RecipientListProcessor extends 
MulticastProcessor {
     public RecipientListProcessor(CamelContext camelContext, ProducerCache 
producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
                                   boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService,
                                   boolean streaming, boolean stopOnException, 
long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean 
parallelAggregate) {
-        super(camelContext, null, aggregationStrategy, parallelProcessing, 
executorService, shutdownExecutorService,
-                streaming, stopOnException, timeout, onPrepare, 
shareUnitOfWork, parallelAggregate);
+        this(camelContext, producerCache, iter, aggregationStrategy, 
parallelProcessing, executorService, shutdownExecutorService, streaming, 
stopOnException, timeout, onPrepare,
+             shareUnitOfWork, parallelAggregate, false);
+    }
+
+    public RecipientListProcessor(CamelContext camelContext, ProducerCache 
producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
+                                  boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService, boolean streaming, boolean 
stopOnException,
+                                  long timeout, Processor onPrepare, boolean 
shareUnitOfWork, boolean parallelAggregate, boolean stopOnAggregateException) {
+        super(camelContext, null, aggregationStrategy, parallelProcessing, 
executorService, shutdownExecutorService, streaming, stopOnException, timeout, 
onPrepare,
+              shareUnitOfWork, parallelAggregate, stopOnAggregateException);
         this.producerCache = producerCache;
         this.iter = iter;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java 
b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index ba3be2e..8a06f79 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -69,12 +69,18 @@ public class Splitter extends MulticastProcessor implements 
AsyncProcessor, Trac
                 streaming, stopOnException, timeout, onPrepare, 
useSubUnitOfWork, false);
     }
 
-    public Splitter(CamelContext camelContext, Expression expression, 
Processor destination, AggregationStrategy aggregationStrategy,
-                    boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService,
-                    boolean streaming, boolean stopOnException, long timeout, 
Processor onPrepare, boolean useSubUnitOfWork,
-                    boolean parallelAggregate) {
-        super(camelContext, Collections.singleton(destination), 
aggregationStrategy, parallelProcessing, executorService,
-                shutdownExecutorService, streaming, stopOnException, timeout, 
onPrepare, useSubUnitOfWork, parallelAggregate);
+    public Splitter(CamelContext camelContext, Expression expression, 
Processor destination, AggregationStrategy aggregationStrategy, boolean 
parallelProcessing,
+                    ExecutorService executorService, boolean 
shutdownExecutorService, boolean streaming, boolean stopOnException, long 
timeout, Processor onPrepare,
+                    boolean useSubUnitOfWork, boolean parallelAggregate) {
+        this(camelContext, expression, destination, aggregationStrategy, 
parallelProcessing, executorService, shutdownExecutorService, streaming, 
stopOnException, timeout,
+             onPrepare, useSubUnitOfWork, false, false);
+    }
+
+    public Splitter(CamelContext camelContext, Expression expression, 
Processor destination, AggregationStrategy aggregationStrategy, boolean 
parallelProcessing,
+                    ExecutorService executorService, boolean 
shutdownExecutorService, boolean streaming, boolean stopOnException, long 
timeout, Processor onPrepare,
+                    boolean useSubUnitOfWork, boolean parallelAggregate, 
boolean stopOnAggregateException) {
+        super(camelContext, Collections.singleton(destination), 
aggregationStrategy, parallelProcessing, executorService, 
shutdownExecutorService, streaming, stopOnException,
+              timeout, onPrepare, useSubUnitOfWork, parallelAggregate, 
stopOnAggregateException);
         this.expression = expression;
         notNull(expression, "expression");
         notNull(destination, "destination");

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
index 802e1b8..c593fa4 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
@@ -29,7 +29,7 @@ import org.apache.camel.Exchange;
  * could be to sum up a total amount etc.
  * <p/>
  * Note that <tt>oldExchange</tt> may be <tt>null</tt> more than once when 
this strategy is throwing a {@link java.lang.RuntimeException}
- * and <tt>parallelProcessing</tt> is used.
+ * and <tt>parallelProcessing</tt> is used. You can work around this behavior 
using the <tt>stopOnAggregateException</tt> option.
  * <p/>
  * It is possible that <tt>newExchange</tt> is <tt>null</tt> which could 
happen if there was no data possible
  * to acquire. Such as when using a {@link 
org.apache.camel.processor.PollEnricher} to poll from a JMS queue which

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/issues/MulticastParallelWithAggregationStrategyThrowingExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/issues/MulticastParallelWithAggregationStrategyThrowingExceptionTest.java
 
b/camel-core/src/test/java/org/apache/camel/issues/MulticastParallelWithAggregationStrategyThrowingExceptionTest.java
new file mode 100644
index 0000000..4b35ea8
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/issues/MulticastParallelWithAggregationStrategyThrowingExceptionTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Tests the issue stated in
+ * <a href="https://issues.apache.org/jira/browse/CAMEL-10272";>CAMEL-10272</a>.
+ */
+public class MulticastParallelWithAggregationStrategyThrowingExceptionTest 
extends ContextTestSupport {
+
+    public void testAggregationTimeExceptionWithParallelProcessing() throws 
Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:end").expectedMessageCount(0);
+        getMockEndpoint("mock:dead").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                // must use share UoW if we want the error handler to react on
+                // exceptions
+                // from the aggregation strategy also.
+                from("direct:start").
+                multicast(new 
MyAggregateBean()).parallelProcessing().stopOnAggregateException().shareUnitOfWork()
+                    .to("mock:a")
+                    .to("mock:b")
+               .end()
+                    .to("mock:end");
+            }
+        };
+    }
+
+    public static class MyAggregateBean implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            throw new RuntimeException("Simulating a runtime exception thrown 
from the aggregation strategy");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java
 
b/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java
new file mode 100644
index 0000000..4509c23
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/issues/RecipientListParallelWithAggregationStrategyThrowingExceptionTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Tests the issue stated in
+ * <a href="https://issues.apache.org/jira/browse/CAMEL-10272";>CAMEL-10272</a>.
+ */
+public class RecipientListParallelWithAggregationStrategyThrowingExceptionTest 
extends ContextTestSupport {
+
+    public void testAggregationTimeExceptionWithParallelProcessing() throws 
Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:end").expectedMessageCount(0);
+        getMockEndpoint("mock:dead").expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start", "Hello World", 
"recipients", "mock:a,mock:b");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                // must use share UoW if we want the error handler to react on
+                // exceptions
+                // from the aggregation strategy also.
+                from("direct:start").
+                recipientList(header("recipients")).aggregationStrategy(new 
MyAggregateBean()).
+                
parallelProcessing().stopOnAggregateException().shareUnitOfWork()
+                .end()
+                    .to("mock:end");
+            }
+        };
+    }
+
+    public static class MyAggregateBean implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            throw new RuntimeException("Simulating a runtime exception thrown 
from the aggregation strategy");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithAggregationStrategyThrowingExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithAggregationStrategyThrowingExceptionTest.java
 
b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithAggregationStrategyThrowingExceptionTest.java
new file mode 100644
index 0000000..66743f4
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/issues/SplitterParallelWithAggregationStrategyThrowingExceptionTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Tests the issue stated in
+ * <a href="https://issues.apache.org/jira/browse/CAMEL-10272";>CAMEL-10272</a>.
+ */
+public class SplitterParallelWithAggregationStrategyThrowingExceptionTest 
extends ContextTestSupport {
+
+    public void testAggregationTimeExceptionWithParallelProcessing() throws 
Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(2);
+        getMockEndpoint("mock:end").expectedMessageCount(0);
+        getMockEndpoint("mock:dead").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello@World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                // must use share UoW if we want the error handler to react on
+                // exceptions
+                // from the aggregation strategy also.
+                from("direct:start").
+                split(body().tokenize("@")).aggregationStrategy(new 
MyAggregateBean()).
+                
parallelProcessing().stopOnAggregateException().shareUnitOfWork()
+                    .to("mock:a")
+               .end()
+                    .to("mock:end");
+            }
+        };
+    }
+
+    public static class MyAggregateBean implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            throw new RuntimeException("Simulating a runtime exception thrown 
from the aggregation strategy");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java
index 8adac73..486bccf 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedMulticastTest.java
@@ -67,7 +67,7 @@ public class ManagedMulticastTest extends 
ManagementTestSupport {
 
         data = (TabularData) mbeanServer.invoke(name, "explain", new 
Object[]{true}, new String[]{"boolean"});
         assertNotNull(data);
-        assertEquals(14, data.size());
+        assertEquals(15, data.size());
 
         String json = (String) mbeanServer.invoke(name, "informationJson", 
null, null);
         assertNotNull(json);

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java
index 0e041e3..a37d6ca 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedRecipientListTest.java
@@ -91,7 +91,7 @@ public class ManagedRecipientListTest extends 
ManagementTestSupport {
 
         data = (TabularData) mbeanServer.invoke(on, "explain", new 
Object[]{true}, new String[]{"boolean"});
         assertNotNull(data);
-        assertEquals(17, data.size());
+        assertEquals(18, data.size());
 
         String json = (String) mbeanServer.invoke(on, "informationJson", null, 
null);
         assertNotNull(json);

http://git-wip-us.apache.org/repos/asf/camel/blob/43036a4a/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java 
b/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java
index c13b342..3276f9e 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedSplitterTest.java
@@ -75,7 +75,7 @@ public class ManagedSplitterTest extends 
ManagementTestSupport {
 
         data = (TabularData) mbeanServer.invoke(on, "explain", new 
Object[]{true}, new String[]{"boolean"});
         assertNotNull(data);
-        assertEquals(15, data.size());
+        assertEquals(16, data.size());
 
         String json = (String) mbeanServer.invoke(on, "informationJson", null, 
null);
         assertNotNull(json);

Reply via email to