Author: davsclaus
Date: Fri Jan  7 15:39:12 2011
New Revision: 1056357

URL: http://svn.apache.org/viewvc?rev=1056357&view=rev
Log:
CAMEL-3497: Fixed logging using correct index number for exchange when using 
parallel

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1056357&r1=1056356&r2=1056357&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 Fri Jan  7 15:39:12 2011
@@ -232,7 +232,7 @@ public class MulticastProcessor extends 
             final Exchange subExchange = pair.getExchange();
             updateNewExchange(subExchange, total.intValue(), pairs, it);
 
-            Future<Exchange> task = completion.submit(new Callable<Exchange>() 
{
+            completion.submit(new Callable<Exchange>() {
                 public Exchange call() throws Exception {
                     if (!running.get()) {
                         // do not start processing the task if we are not 
running
@@ -246,14 +246,16 @@ public class MulticastProcessor extends 
                     }
 
                     // Decide whether to continue with the multicast or not; 
similar logic to the Pipeline
-                    boolean continueProcessing = 
PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for 
number " + total.get(), LOG);
+                    Integer number = getExchangeIndex(subExchange);
+                    boolean continueProcessing = 
PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for 
number " + number, LOG);
                     if (stopOnException && !continueProcessing) {
+                        // signal to stop running
+                        running.set(false);
+                        // throw caused exception
                         if (subExchange.getException() != null) {
                             // wrap in exception to explain where it failed
-                            throw new CamelExchangeException("Parallel 
processing failed for number " + total.get(), subExchange, 
subExchange.getException());
+                            throw new CamelExchangeException("Parallel 
processing failed for number " + number, subExchange, 
subExchange.getException());
                         }
-                        // signal to stop running
-                        running.set(false);
                     }
 
                     if (LOG.isTraceEnabled()) {
@@ -318,7 +320,8 @@ public class MulticastProcessor extends 
                 Exchange subExchange = future.get();
 
                 // Decide whether to continue with the multicast or not; 
similar logic to the Pipeline
-                boolean continueProcessing = 
PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for 
number " + total.get(), LOG);
+                Integer number = getExchangeIndex(subExchange);
+                boolean continueProcessing = 
PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for 
number " + number, LOG);
                 if (stopOnException && !continueProcessing) {
                     // we want to stop on exception and an exception or 
failure occurred
                     // this is similar to what the pipeline does, so we should 
do the same to not surprise end users
@@ -635,6 +638,10 @@ public class MulticastProcessor extends 
         }
     }
 
+    protected Integer getExchangeIndex(Exchange exchange) {
+        return exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class);
+    }
+
     protected Iterable<ProcessorExchangePair> 
createProcessorExchangePairs(Exchange exchange) throws Exception {
         List<ProcessorExchangePair> result = new 
ArrayList<ProcessorExchangePair>(processors.size());
 
@@ -665,6 +672,7 @@ public class MulticastProcessor extends 
         // set property which endpoint we send to
         setToEndpoint(copy, prepared);
 
+        // TODO: optimize to reuse error handlers instead of re-building for 
each exchange pair
         // rework error handling to support fine grained error handling
         if (exchange.getUnitOfWork() != null && 
exchange.getUnitOfWork().getRouteContext() != null) {
             // wrap the producer in error handler so we have fine grained 
error handling on

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=1056357&r1=1056356&r2=1056357&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java 
Fri Jan  7 15:39:12 2011
@@ -189,6 +189,11 @@ public class Splitter extends MulticastP
         }
     }
 
+    @Override
+    protected Integer getExchangeIndex(Exchange exchange) {
+        return exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
+    }
+
     public Expression getExpression() {
         return expression;
     }


Reply via email to