Author: davsclaus
Date: Mon Aug 30 15:22:15 2010
New Revision: 990821

URL: http://svn.apache.org/viewvc?rev=990821&view=rev
Log:
CAMEL-3023: Passing in original Exchange if timeout occurred an no aggregation 
has been done yet.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java
      - copied, changed from r990801, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRecipientListTimeoutTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitParallelTimeoutTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=990821&r1=990820&r2=990821&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 Mon Aug 30 15:22:15 2010
@@ -294,7 +294,12 @@ public class MulticastProcessor extends 
                 AggregationStrategy strategy = getAggregationStrategy(null);
                 if (strategy instanceof TimeoutAwareAggregationStrategy) {
                     // notify the strategy we timed out
-                    ((TimeoutAwareAggregationStrategy) 
strategy).timeout(result.get(), i, total.intValue(), timeout);
+                    Exchange oldExchange = result.get();
+                    if (oldExchange == null) {
+                        // if they all timed out the result may not have been 
set yet, so use the original exchange
+                        oldExchange = original;
+                    }
+                    ((TimeoutAwareAggregationStrategy) 
strategy).timeout(oldExchange, i, total.intValue(), timeout);
                 } else {
                     // log a WARN we timed out since it will not be aggregated 
and the Exchange will be lost
                     LOG.warn("Parallel processing timed out after " + timeout 
+ " millis for number " + i + ". This task will be cancelled and will not be 
aggregated.");

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java?rev=990821&r1=990820&r2=990821&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
 Mon Aug 30 15:22:15 2010
@@ -30,7 +30,8 @@ public interface TimeoutAwareAggregation
     /**
      * A timeout occurred
      *
-     * @param oldExchange  the oldest exchange (is <tt>null</tt> on first 
aggregation as we only have the new exchange)
+     * @param oldExchange  the current aggregated exchange, or the original 
{...@link Exchange} if no aggregation
+     *                     has been done before the timeout occurred
      * @param index        the index
      * @param total        the total
      * @param timeout      the timeout value in millis

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRecipientListTimeoutTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRecipientListTimeoutTest.java?rev=990821&r1=990820&r2=990821&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRecipientListTimeoutTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BeanRecipientListTimeoutTest.java
 Mon Aug 30 15:22:15 2010
@@ -76,6 +76,7 @@ public class BeanRecipientListTimeoutTes
             assertEquals(2000, timeout);
             assertEquals(3, total);
             assertEquals(0, index);
+            assertNotNull(oldExchange);
         }
 
         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java
 (from r990801, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java&r1=990801&r2=990821&rev=990821&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelAllTimeoutAwareTest.java
 Mon Aug 30 15:22:15 2010
@@ -25,12 +25,12 @@ import org.apache.camel.processor.aggreg
 /**
  * @version $Revision: 777808 $
  */
-public class MulticastParallelTimeoutAwareTest extends ContextTestSupport {
+public class MulticastParallelAllTimeoutAwareTest extends ContextTestSupport {
 
-    public void testMulticastParallelTimeoutAware() throws Exception {
+    public void testMulticastParallelAllTimeoutAware() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        // A will timeout so we only get B and C
-        mock.expectedBodiesReceived("BC");
+        // ABC will timeout so we only get our canned response
+        mock.expectedBodiesReceived("AllTimeout");
 
         template.sendBody("direct:start", "Hello");
 
@@ -44,16 +44,16 @@ public class MulticastParallelTimeoutAwa
             public void configure() throws Exception {
                 from("direct:start")
                         .multicast(new MyAggregationStrategy())
-                        .parallelProcessing().timeout(2000).to("direct:a", 
"direct:b", "direct:c")
+                        .parallelProcessing().timeout(500).to("direct:a", 
"direct:b", "direct:c")
                         // use end to indicate end of multicast route
                         .end()
                         .to("mock:result");
 
-                from("direct:a").delay(3000).setBody(constant("A"));
+                from("direct:a").delay(1000).setBody(constant("A"));
 
-                from("direct:b").setBody(constant("B"));
+                from("direct:b").delay(2000).setBody(constant("B"));
 
-                from("direct:c").delay(500).setBody(constant("C"));
+                from("direct:c").delay(1500).setBody(constant("C"));
             }
         };
     }
@@ -61,18 +61,15 @@ public class MulticastParallelTimeoutAwa
     private class MyAggregationStrategy implements 
TimeoutAwareAggregationStrategy {
 
         public void timeout(Exchange oldExchange, int index, int total, long 
timeout) {
-            assertEquals(2000, timeout);
+            assertEquals(500, timeout);
             assertEquals(3, total);
             assertEquals(0, index);
+            assertNotNull(oldExchange);
+            oldExchange.getIn().setBody("AllTimeout");
         }
 
         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-            if (oldExchange == null) {
-                return newExchange;
-            }
-
-            String body = oldExchange.getIn().getBody(String.class);
-            oldExchange.getIn().setBody(body + 
newExchange.getIn().getBody(String.class));
+            // noop
             return oldExchange;
         }
     }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java?rev=990821&r1=990820&r2=990821&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutAwareTest.java
 Mon Aug 30 15:22:15 2010
@@ -64,6 +64,7 @@ public class MulticastParallelTimeoutAwa
             assertEquals(2000, timeout);
             assertEquals(3, total);
             assertEquals(0, index);
+            assertNotNull(oldExchange);
         }
 
         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitParallelTimeoutTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitParallelTimeoutTest.java?rev=990821&r1=990820&r2=990821&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitParallelTimeoutTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitParallelTimeoutTest.java
 Mon Aug 30 15:22:15 2010
@@ -68,6 +68,7 @@ public class SplitParallelTimeoutTest ex
             assertEquals(2000, timeout);
             assertEquals(3, total);
             assertEquals(0, index);
+            assertNotNull(oldExchange);
         }
 
         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {


Reply via email to