Author: davsclaus
Date: Sun Jan 24 12:59:12 2010
New Revision: 902559
URL: http://svn.apache.org/viewvc?rev=902559&view=rev
Log:
Added unit test based on user forum issue.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java
- copied, changed from r902552,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java
(from r902552,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java&r1=902552&r2=902559&rev=902559&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java
Sun Jan 24 12:59:12 2010
@@ -22,60 +22,57 @@
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
-/**
- * Unit test for using our own aggregation strategy.
- */
-public class CustomAggregationStrategyTest extends ContextTestSupport {
-
- public void testCustomAggregationStrategy() throws Exception {
- // START SNIPPET: e2
- MockEndpoint result = getMockEndpoint("mock:result");
-
- // we expect to find the two winners with the highest bid
- result.expectedMessageCount(2);
- result.expectedBodiesReceived("200", "150");
-
- // then we sent all the message at once
- template.sendBodyAndHeader("direct:start", "100", "id", "1");
- template.sendBodyAndHeader("direct:start", "150", "id", "2");
- template.sendBodyAndHeader("direct:start", "130", "id", "2");
- template.sendBodyAndHeader("direct:start", "200", "id", "1");
- template.sendBodyAndHeader("direct:start", "190", "id", "1");
+public class AggregateFromWireTapTest extends ContextTestSupport {
+
+ public void testAggregateFromWireTap() throws Exception {
+ MockEndpoint end = getMockEndpoint("mock:end");
+ end.expectedBodiesReceived("A", "B");
+
+ MockEndpoint aggregated = getMockEndpoint("mock:aggregated");
+ aggregated.expectedBodiesReceived("AB");
+
+ template.sendBody("direct:start", "A");
+ template.sendBody("direct:start", "B");
assertMockEndpointsSatisfied();
- // END SNIPPET: e2
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- // START SNIPPET: e1
- // our route is aggregating from the direct queue and sending
the response to the mock
+ context.setTracing(true);
+
from("direct:start")
- // aggregated by header id and use our own strategy how to
aggregate
- .aggregate(new MyAggregationStrategy()).header("id")
- // wait for 0.5 seconds to aggregate
- .batchTimeout(500L)
- .to("mock:result");
- // END SNIPPET: e1
+ .wireTap("direct:tap")
+ .to("mock:end");
+
+ from("direct:tap")
+ // just use a constant correlation expression as we want
to agg everything
+ // in the same group. set batch size to two which means to
fire when we
+ // have received 2 incoming messages, if not the timeout
of 5 sec will kick in
+ .aggregate(new
MyAggregationStrategy()).constant(true).batchSize(2)
+ .batchTimeout(5000L)
+ .to("direct:aggregated")
+ .end();
+
+ from("direct:aggregated")
+ .to("mock:aggregated");
}
};
}
- // START SNIPPET: e3
private static class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
- // the first time we only have the new exchange so it wins the
first round
return newExchange;
}
- int oldPrice = oldExchange.getIn().getBody(Integer.class);
- int newPrice = newExchange.getIn().getBody(Integer.class);
- // return the "winner" that has the highest price
- return newPrice > oldPrice ? newExchange : oldExchange;
+ String oldBody = oldExchange.getIn().getBody(String.class);
+ String newBody = newExchange.getIn().getBody(String.class);
+ oldExchange.getIn().setBody(oldBody + newBody);
+ return oldExchange;
}
}
- // END SNIPPET: e3
+
}
\ No newline at end of file