Author: davsclaus Date: Mon Aug 30 15:22:15 2010 New Revision: 990821 URL: http://svn.apache.org/viewvc?rev=990821&view=rev Log: CAMEL-3023: Passing in original Exchange if timeout occurred an no aggregation has been done yet.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java - copied, changed from r990801, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRecipientListTimeoutTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitParallelTimeoutTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=990821&r1=990820&r2=990821&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Mon Aug 30 15:22:15 2010 @@ -294,7 +294,12 @@ public class MulticastProcessor extends AggregationStrategy strategy = getAggregationStrategy(null); if (strategy instanceof TimeoutAwareAggregationStrategy) { // notify the strategy we timed out - ((TimeoutAwareAggregationStrategy) strategy).timeout(result.get(), i, total.intValue(), timeout); + Exchange oldExchange = result.get(); + if (oldExchange == null) { + // if they all timed out the result may not have been set yet, so use the original exchange + oldExchange = original; + } + ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, i, total.intValue(), timeout); } else { // log a WARN we timed out since it will not be aggregated and the Exchange will be lost LOG.warn("Parallel processing timed out after " + timeout + " millis for number " + i + ". This task will be cancelled and will not be aggregated."); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java?rev=990821&r1=990820&r2=990821&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java Mon Aug 30 15:22:15 2010 @@ -30,7 +30,8 @@ public interface TimeoutAwareAggregation /** * A timeout occurred * - * @param oldExchange the oldest exchange (is <tt>null</tt> on first aggregation as we only have the new exchange) + * @param oldExchange the current aggregated exchange, or the original {...@link Exchange} if no aggregation + * has been done before the timeout occurred * @param index the index * @param total the total * @param timeout the timeout value in millis Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRecipientListTimeoutTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRecipientListTimeoutTest.java?rev=990821&r1=990820&r2=990821&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRecipientListTimeoutTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRecipientListTimeoutTest.java Mon Aug 30 15:22:15 2010 @@ -76,6 +76,7 @@ public class BeanRecipientListTimeoutTes assertEquals(2000, timeout); assertEquals(3, total); assertEquals(0, index); + assertNotNull(oldExchange); } public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java (from r990801, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java&r1=990801&r2=990821&rev=990821&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java Mon Aug 30 15:22:15 2010 @@ -25,12 +25,12 @@ import org.apache.camel.processor.aggreg /** * @version $Revision: 777808 $ */ -public class MulticastParallelTimeoutAwareTest extends ContextTestSupport { +public class MulticastParallelAllTimeoutAwareTest extends ContextTestSupport { - public void testMulticastParallelTimeoutAware() throws Exception { + public void testMulticastParallelAllTimeoutAware() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - // A will timeout so we only get B and C - mock.expectedBodiesReceived("BC"); + // ABC will timeout so we only get our canned response + mock.expectedBodiesReceived("AllTimeout"); template.sendBody("direct:start", "Hello"); @@ -44,16 +44,16 @@ public class MulticastParallelTimeoutAwa public void configure() throws Exception { from("direct:start") .multicast(new MyAggregationStrategy()) - .parallelProcessing().timeout(2000).to("direct:a", "direct:b", "direct:c") + .parallelProcessing().timeout(500).to("direct:a", "direct:b", "direct:c") // use end to indicate end of multicast route .end() .to("mock:result"); - from("direct:a").delay(3000).setBody(constant("A")); + from("direct:a").delay(1000).setBody(constant("A")); - from("direct:b").setBody(constant("B")); + from("direct:b").delay(2000).setBody(constant("B")); - from("direct:c").delay(500).setBody(constant("C")); + from("direct:c").delay(1500).setBody(constant("C")); } }; } @@ -61,18 +61,15 @@ public class MulticastParallelTimeoutAwa private class MyAggregationStrategy implements TimeoutAwareAggregationStrategy { public void timeout(Exchange oldExchange, int index, int total, long timeout) { - assertEquals(2000, timeout); + assertEquals(500, timeout); assertEquals(3, total); assertEquals(0, index); + assertNotNull(oldExchange); + oldExchange.getIn().setBody("AllTimeout"); } public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { - if (oldExchange == null) { - return newExchange; - } - - String body = oldExchange.getIn().getBody(String.class); - oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); + // noop return oldExchange; } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java?rev=990821&r1=990820&r2=990821&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java Mon Aug 30 15:22:15 2010 @@ -64,6 +64,7 @@ public class MulticastParallelTimeoutAwa assertEquals(2000, timeout); assertEquals(3, total); assertEquals(0, index); + assertNotNull(oldExchange); } public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitParallelTimeoutTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitParallelTimeoutTest.java?rev=990821&r1=990820&r2=990821&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitParallelTimeoutTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitParallelTimeoutTest.java Mon Aug 30 15:22:15 2010 @@ -68,6 +68,7 @@ public class SplitParallelTimeoutTest ex assertEquals(2000, timeout); assertEquals(3, total); assertEquals(0, index); + assertNotNull(oldExchange); } public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {