Author: davsclaus Date: Thu Feb 18 08:48:38 2010 New Revision: 911285 URL: http://svn.apache.org/viewvc?rev=911285&view=rev Log: CAMEL-1686: Aggregator examples
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateEagerTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java (with props) Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateEagerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateEagerTest.java?rev=911285&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateEagerTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateEagerTest.java Thu Feb 18 08:48:38 2010 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.BodyInAggregatingStrategy; + +/** + * @version $Revision$ + */ +public class AggregateSimplePredicateEagerTest extends ContextTestSupport { + + public void testAggregateSimplePredicateEager() throws Exception { + getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C+END"); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + template.sendBodyAndHeader("direct:start", "C", "id", 123); + template.sendBodyAndHeader("direct:start", "END", "id", 123); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + from("direct:start") + // aggregate all exchanges correlated by the id header. + // Aggregate them using the BodyInAggregatingStrategy strategy + // do eager checking which means the completion predicate will use the incoming exchange + // which allows us to trigger completion when a certain exchange arrived which is the + // END message + .aggregate(header("id"), new BodyInAggregatingStrategy()) + .eagerCheckCompletion().completionPredicate(body().isEqualTo("END")) + .to("mock:aggregated"); + // END SNIPPET: e1 + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateEagerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateEagerTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateTest.java?rev=911285&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateTest.java Thu Feb 18 08:48:38 2010 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.BodyInAggregatingStrategy; + +/** + * @version $Revision$ + */ +public class AggregateSimplePredicateTest extends ContextTestSupport { + + public void testAggregateSimplePredicate() throws Exception { + getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C"); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + template.sendBodyAndHeader("direct:start", "C", "id", 123); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + from("direct:start") + // aggregate all exchanges correlated by the id header. + // Aggregate them using the BodyInAggregatingStrategy strategy which + // and when the aggregated body contains A+B+C then complete the aggregation + // and send it to mock:aggregated + .aggregate(header("id"), new BodyInAggregatingStrategy()).completionPredicate(body().contains("A+B+C")) + .to("mock:aggregated"); + // END SNIPPET: e1 + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java?rev=911285&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java Thu Feb 18 08:48:38 2010 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.BodyInAggregatingStrategy; + +/** + * @version $Revision$ + */ +public class AggregateSimpleSizeTest extends ContextTestSupport { + + public void testAggregateSimpleSize() throws Exception { + getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C"); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + template.sendBodyAndHeader("direct:start", "C", "id", 123); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + from("direct:start") + // aggregate all exchanges correlated by the id header. + // Aggregate them using the BodyInAggregatingStrategy strategy which + // and after 3 messages has been aggregated then complete the aggregation + // and send it to mock:aggregated + .aggregate(header("id"), new BodyInAggregatingStrategy()).completionSize(3) + .to("mock:aggregated"); + // END SNIPPET: e1 + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java?rev=911285&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java Thu Feb 18 08:48:38 2010 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.BodyInAggregatingStrategy; + +/** + * @version $Revision$ + */ +public class AggregateSimpleTimeoutTest extends ContextTestSupport { + + public void testAggregateSimpleTimeout() throws Exception { + getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C"); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + template.sendBodyAndHeader("direct:start", "C", "id", 123); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + from("direct:start") + // aggregate all exchanges correlated by the id header. + // Aggregate them using the BodyInAggregatingStrategy strategy which + // and after 3 seconds of inactivity them timeout and complete the aggregation + // and send it to mock:aggregated + .aggregate(header("id"), new BodyInAggregatingStrategy()).completionTimeout(3000) + .to("mock:aggregated"); + // END SNIPPET: e1 + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date