Author: davsclaus Date: Fri Jan 7 15:39:12 2011 New Revision: 1056357 URL: http://svn.apache.org/viewvc?rev=1056357&view=rev Log: CAMEL-3497: Fixed logging using correct index number for exchange when using parallel
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1056357&r1=1056356&r2=1056357&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Fri Jan 7 15:39:12 2011 @@ -232,7 +232,7 @@ public class MulticastProcessor extends final Exchange subExchange = pair.getExchange(); updateNewExchange(subExchange, total.intValue(), pairs, it); - Future<Exchange> task = completion.submit(new Callable<Exchange>() { + completion.submit(new Callable<Exchange>() { public Exchange call() throws Exception { if (!running.get()) { // do not start processing the task if we are not running @@ -246,14 +246,16 @@ public class MulticastProcessor extends } // Decide whether to continue with the multicast or not; similar logic to the Pipeline - boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + total.get(), LOG); + Integer number = getExchangeIndex(subExchange); + boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); if (stopOnException && !continueProcessing) { + // signal to stop running + running.set(false); + // throw caused exception if (subExchange.getException() != null) { // wrap in exception to explain where it failed - throw new CamelExchangeException("Parallel processing failed for number " + total.get(), subExchange, subExchange.getException()); + throw new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException()); } - // signal to stop running - running.set(false); } if (LOG.isTraceEnabled()) { @@ -318,7 +320,8 @@ public class MulticastProcessor extends Exchange subExchange = future.get(); // Decide whether to continue with the multicast or not; similar logic to the Pipeline - boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + total.get(), LOG); + Integer number = getExchangeIndex(subExchange); + boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); if (stopOnException && !continueProcessing) { // we want to stop on exception and an exception or failure occurred // this is similar to what the pipeline does, so we should do the same to not surprise end users @@ -635,6 +638,10 @@ public class MulticastProcessor extends } } + protected Integer getExchangeIndex(Exchange exchange) { + return exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class); + } + protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size()); @@ -665,6 +672,7 @@ public class MulticastProcessor extends // set property which endpoint we send to setToEndpoint(copy, prepared); + // TODO: optimize to reuse error handlers instead of re-building for each exchange pair // rework error handling to support fine grained error handling if (exchange.getUnitOfWork() != null && exchange.getUnitOfWork().getRouteContext() != null) { // wrap the producer in error handler so we have fine grained error handling on Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=1056357&r1=1056356&r2=1056357&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Fri Jan 7 15:39:12 2011 @@ -189,6 +189,11 @@ public class Splitter extends MulticastP } } + @Override + protected Integer getExchangeIndex(Exchange exchange) { + return exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class); + } + public Expression getExpression() { return expression; }