Author: davsclaus
Date: Fri Dec 18 13:12:21 2009
New Revision: 892244

URL: http://svn.apache.org/viewvc?rev=892244&view=rev
Log:
CAMEL-2303: Fixed groupedExchang now working properly on Aggregator.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
      - copied, changed from r892224, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
      - copied, changed from r892224, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=892244&r1=892243&r2=892244&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
 Fri Dec 18 13:12:21 2009
@@ -35,6 +35,7 @@
 import org.apache.camel.processor.Aggregator;
 import org.apache.camel.processor.aggregate.AggregationCollection;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.RouteContext;
 
@@ -190,9 +191,15 @@
         if (strategy == null && strategyRef != null) {
             strategy = routeContext.lookup(strategyRef, 
AggregationStrategy.class);
         }
+        // pick a default strategy
         if (strategy == null) {
-            // fallback to use latest
-            strategy = new UseLatestAggregationStrategy();
+            if (groupExchanges != null && groupExchanges) {
+                // if grouped exchange is enabled then use special strategy 
for that
+                strategy = new GroupedExchangeAggregationStrategy();
+            } else {
+                // fallback to use latest
+                strategy = new UseLatestAggregationStrategy();
+            }
         }
         return strategy;
     }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=892244&r1=892243&r2=892244&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
 Fri Dec 18 13:12:21 2009
@@ -30,7 +30,6 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.ExceptionHandler;
@@ -350,32 +349,11 @@
 
         @SuppressWarnings("unchecked")
         private void sendExchanges() throws Exception {
-            Exchange grouped = null;
-
             Iterator<Exchange> iter = collection.iterator();
             while (iter.hasNext()) {
                 Exchange exchange = iter.next();
                 iter.remove();
-                if (!groupExchanges) {
-                    // non grouped so process the exchange one at a time
-                    processExchange(exchange);
-                } else {
-                    // grouped so add all exchanges into one group
-                    if (grouped == null) {
-                        grouped = new DefaultExchange(exchange);
-                    }
-                    List<Exchange> list = 
grouped.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
-                    if (list == null) {
-                        list = new ArrayList<Exchange>();
-                        grouped.setProperty(Exchange.GROUPED_EXCHANGE, list);
-                    }
-                    list.add(exchange);
-                }
-            }
-
-            // and after adding process the single grouped exchange
-            if (grouped != null) {
-                processExchange(grouped);
+                processExchange(exchange);
             }
         }
     }

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
 (from r892224, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java&r1=892224&r2=892244&rev=892244&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
 Fri Dec 18 13:12:21 2009
@@ -26,10 +26,9 @@
 /**
  * Unit test for aggregate grouped exchanges.
  */
-public class AggregateGroupedExchangeTest extends ContextTestSupport {
+public class AggregateGroupedExchangeBatchSizeTest extends ContextTestSupport {
 
     public void testGrouped() throws Exception {
-        // START SNIPPET: e2
         MockEndpoint result = getMockEndpoint("mock:result");
 
         // we expect 1 messages since we group all we get in using the same 
correlation key
@@ -40,20 +39,27 @@
         template.sendBody("direct:start", "150");
         template.sendBody("direct:start", "130");
         template.sendBody("direct:start", "200");
-        template.sendBody("direct:start", "190");
 
         assertMockEndpointsSatisfied();
 
         Exchange out = result.getExchanges().get(0);
         List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, 
List.class);
 
-        assertEquals(5, grouped.size());
+        assertEquals(2, grouped.size());
 
         assertEquals("100", grouped.get(0).getIn().getBody(String.class));
         assertEquals("150", grouped.get(1).getIn().getBody(String.class));
-        assertEquals("130", grouped.get(2).getIn().getBody(String.class));
-        assertEquals("200", grouped.get(3).getIn().getBody(String.class));
-        assertEquals("190", grouped.get(4).getIn().getBody(String.class));
+
+        // wait a bit for the remainder to come in
+        Thread.sleep(1000);
+
+        out = result.getExchanges().get(1);
+        grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+
+        assertEquals(2, grouped.size());
+
+        assertEquals("130", grouped.get(0).getIn().getBody(String.class));
+        assertEquals("200", grouped.get(1).getIn().getBody(String.class));
         // END SNIPPET: e2
     }
 
@@ -64,8 +70,8 @@
                 // START SNIPPET: e1
                 // our route is aggregating from the direct queue and sending 
the response to the mock
                 from("direct:start")
-                    // aggregated using id as correlation so each is unqiue 
and thus we batch everything
-                    .aggregate().simple("id")
+                    // aggregated all use same expression
+                    .aggregate().constant(true).batchSize(2)
                     // wait for 0.5 seconds to aggregate
                     .batchTimeout(500L)
                     // group the exchanges so we get one single exchange 
containing all the others
@@ -75,4 +81,4 @@
             }
         };
     }
-}
+}
\ No newline at end of file

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
 (from r892224, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java&r1=892224&r2=892244&rev=892244&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
 Fri Dec 18 13:12:21 2009
@@ -26,34 +26,42 @@
 /**
  * Unit test for aggregate grouped exchanges.
  */
-public class AggregateGroupedExchangeTest extends ContextTestSupport {
+public class AggregateGroupedExchangeMultipleCorrelationTest extends 
ContextTestSupport {
 
     public void testGrouped() throws Exception {
         // START SNIPPET: e2
         MockEndpoint result = getMockEndpoint("mock:result");
 
-        // we expect 1 messages since we group all we get in using the same 
correlation key
-        result.expectedMessageCount(1);
+        // we expect 2 messages since we group using A or B keys
+        result.expectedMessageCount(2);
 
         // then we sent all the message at once
-        template.sendBody("direct:start", "100");
-        template.sendBody("direct:start", "150");
-        template.sendBody("direct:start", "130");
-        template.sendBody("direct:start", "200");
-        template.sendBody("direct:start", "190");
+        template.sendBodyAndHeader("direct:start", "100", "foo", "A");
+        template.sendBodyAndHeader("direct:start", "130", "foo", "B");
+        template.sendBodyAndHeader("direct:start", "150", "foo", "A");
+        template.sendBodyAndHeader("direct:start", "200", "foo", "B");
+        template.sendBodyAndHeader("direct:start", "180", "foo", "B");
+        template.sendBodyAndHeader("direct:start", "120", "foo", "A");
 
         assertMockEndpointsSatisfied();
 
         Exchange out = result.getExchanges().get(0);
         List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, 
List.class);
 
-        assertEquals(5, grouped.size());
+        assertEquals(3, grouped.size());
 
         assertEquals("100", grouped.get(0).getIn().getBody(String.class));
         assertEquals("150", grouped.get(1).getIn().getBody(String.class));
-        assertEquals("130", grouped.get(2).getIn().getBody(String.class));
-        assertEquals("200", grouped.get(3).getIn().getBody(String.class));
-        assertEquals("190", grouped.get(4).getIn().getBody(String.class));
+        assertEquals("120", grouped.get(2).getIn().getBody(String.class));
+
+        out = result.getExchanges().get(1);
+        grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+
+        assertEquals(3, grouped.size());
+
+        assertEquals("130", grouped.get(0).getIn().getBody(String.class));
+        assertEquals("200", grouped.get(1).getIn().getBody(String.class));
+        assertEquals("180", grouped.get(2).getIn().getBody(String.class));
         // END SNIPPET: e2
     }
 
@@ -64,10 +72,10 @@
                 // START SNIPPET: e1
                 // our route is aggregating from the direct queue and sending 
the response to the mock
                 from("direct:start")
-                    // aggregated using id as correlation so each is unqiue 
and thus we batch everything
-                    .aggregate().simple("id")
-                    // wait for 0.5 seconds to aggregate
-                    .batchTimeout(500L)
+                    // aggregate all using the foo header
+                    .aggregate().header("foo")
+                    // wait for 1 seconds to aggregate
+                    .batchTimeout(1000L)
                     // group the exchanges so we get one single exchange 
containing all the others
                     .groupExchanges()
                     .to("mock:result");
@@ -75,4 +83,4 @@
             }
         };
     }
-}
+}
\ No newline at end of file

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java?rev=892244&r1=892243&r2=892244&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
 Fri Dec 18 13:12:21 2009
@@ -64,8 +64,8 @@
                 // START SNIPPET: e1
                 // our route is aggregating from the direct queue and sending 
the response to the mock
                 from("direct:start")
-                    // aggregated using id as correlation so each is unqiue 
and thus we batch everything
-                    .aggregate().simple("id")
+                    // aggregate all using same expression
+                    .aggregate().constant(true)
                     // wait for 0.5 seconds to aggregate
                     .batchTimeout(500L)
                     // group the exchanges so we get one single exchange 
containing all the others


Reply via email to