Author: davsclaus
Date: Sun Jan 24 12:59:12 2010
New Revision: 902559

URL: http://svn.apache.org/viewvc?rev=902559&view=rev
Log:
Added unit test based on user forum issue.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java
      - copied, changed from r902552, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java
 (from r902552, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java&r1=902552&r2=902559&rev=902559&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java
 Sun Jan 24 12:59:12 2010
@@ -22,60 +22,57 @@
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 
-/**
- * Unit test for using our own aggregation strategy.
- */
-public class CustomAggregationStrategyTest extends ContextTestSupport {
-
-    public void testCustomAggregationStrategy() throws Exception {
-        // START SNIPPET: e2
-        MockEndpoint result = getMockEndpoint("mock:result");
-
-        // we expect to find the two winners with the highest bid
-        result.expectedMessageCount(2);
-        result.expectedBodiesReceived("200", "150");
-
-        // then we sent all the message at once
-        template.sendBodyAndHeader("direct:start", "100", "id", "1");
-        template.sendBodyAndHeader("direct:start", "150", "id", "2");
-        template.sendBodyAndHeader("direct:start", "130", "id", "2");
-        template.sendBodyAndHeader("direct:start", "200", "id", "1");
-        template.sendBodyAndHeader("direct:start", "190", "id", "1");
+public class AggregateFromWireTapTest extends ContextTestSupport {
+
+    public void testAggregateFromWireTap() throws Exception {
+        MockEndpoint end = getMockEndpoint("mock:end");
+        end.expectedBodiesReceived("A", "B");
+
+        MockEndpoint aggregated = getMockEndpoint("mock:aggregated");
+        aggregated.expectedBodiesReceived("AB");
+
+        template.sendBody("direct:start", "A");
+        template.sendBody("direct:start", "B");
 
         assertMockEndpointsSatisfied();
-        // END SNIPPET: e2
     }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                // START SNIPPET: e1
-                // our route is aggregating from the direct queue and sending 
the response to the mock
+                context.setTracing(true);
+
                 from("direct:start")
-                    // aggregated by header id and use our own strategy how to 
aggregate
-                    .aggregate(new MyAggregationStrategy()).header("id")
-                    // wait for 0.5 seconds to aggregate
-                    .batchTimeout(500L)
-                    .to("mock:result");
-                // END SNIPPET: e1
+                    .wireTap("direct:tap")
+                    .to("mock:end");
+
+                from("direct:tap")
+                    // just use a constant correlation expression as we want 
to agg everything
+                    // in the same group. set batch size to two which means to 
fire when we
+                    // have received 2 incoming messages, if not the timeout 
of 5 sec will kick in
+                    .aggregate(new 
MyAggregationStrategy()).constant(true).batchSize(2)
+                       .batchTimeout(5000L)
+                        .to("direct:aggregated")
+                    .end();
+
+                from("direct:aggregated")
+                    .to("mock:aggregated");
             }
         };
     }
 
-    // START SNIPPET: e3
     private static class MyAggregationStrategy implements AggregationStrategy {
 
         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
             if (oldExchange == null) {
-                // the first time we only have the new exchange so it wins the 
first round
                 return newExchange;
             }
-            int oldPrice = oldExchange.getIn().getBody(Integer.class);
-            int newPrice = newExchange.getIn().getBody(Integer.class);
-            // return the "winner" that has the highest price
-            return newPrice > oldPrice ? newExchange : oldExchange;
+            String oldBody = oldExchange.getIn().getBody(String.class);
+            String newBody = newExchange.getIn().getBody(String.class);
+            oldExchange.getIn().setBody(oldBody + newBody);
+            return oldExchange;
         }
     }
-    // END SNIPPET: e3
+
 }
\ No newline at end of file


Reply via email to