Author: davsclaus Date: Mon May 2 17:39:08 2011 New Revision: 1098685 URL: http://svn.apache.org/viewvc?rev=1098685&view=rev Log: CAMEL-3930: Aggregate EIP will default use eager check completion if grouped exchanges has been enabled.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeCompletionExpressionSizeTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeCompletionSizeTest.java - copied, changed from r1098631, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateGroupedExchangeCompletionExpressionSizeTest.java - copied, changed from r1098631, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateGroupedExchangeCompletionExpressionSizeTest.xml - copied, changed from r1098631, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=1098685&r1=1098684&r2=1098685&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Mon May 2 17:39:08 2011 @@ -222,6 +222,11 @@ public class AggregateDefinition extends if (strategy != null || strategyRef != null) { throw new IllegalArgumentException("Options groupExchanges and AggregationStrategy cannot be enabled at the same time"); } + if (eagerCheckCompletion != null && !eagerCheckCompletion) { + throw new IllegalArgumentException("Option eagerCheckCompletion cannot be false when groupExchanges has been enabled"); + } + // set eager check to enabled by default when using grouped exchanges + setEagerCheckCompletion(true); // if grouped exchange is enabled then use special strategy for that strategy = new GroupedExchangeAggregationStrategy(); } @@ -607,6 +612,8 @@ public class AggregateDefinition extends */ public AggregateDefinition groupExchanges() { setGroupExchanges(true); + // must use eager check when using grouped exchanges + setEagerCheckCompletion(true); return this; } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeCompletionExpressionSizeTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeCompletionExpressionSizeTest.java?rev=1098685&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeCompletionExpressionSizeTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeCompletionExpressionSizeTest.java Mon May 2 17:39:08 2011 @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * Unit test for aggregate grouped exchanges. + */ +public class AggregateGroupedExchangeCompletionExpressionSizeTest extends ContextTestSupport { + + public void testGrouped() throws Exception { + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(2); + + template.sendBodyAndHeader("direct:start", "A", "size", 3); + template.sendBodyAndHeader("direct:start", "B", "size", 3); + template.sendBodyAndHeader("direct:start", "C", "size", 3); + template.sendBodyAndHeader("direct:start", "D", "size", 3); + template.sendBodyAndHeader("direct:start", "E", "size", 3); + template.sendBodyAndHeader("direct:start", "F", "size", 3); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("direct:start") + .aggregate(constant(true)).completionSize(header("size")) + .groupExchanges() + .to("mock:result"); + } + }; + } +} \ No newline at end of file Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeCompletionSizeTest.java (from r1098631, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeCompletionSizeTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeCompletionSizeTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java&r1=1098631&r2=1098685&rev=1098685&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeCompletionSizeTest.java Mon May 2 17:39:08 2011 @@ -16,73 +16,37 @@ */ package org.apache.camel.processor.aggregator; -import java.util.List; - import org.apache.camel.ContextTestSupport; -import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; /** * Unit test for aggregate grouped exchanges. */ -public class AggregateGroupedExchangeBatchSizeTest extends ContextTestSupport { +public class AggregateGroupedExchangeCompletionSizeTest extends ContextTestSupport { - @SuppressWarnings("unchecked") public void testGrouped() throws Exception { MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(2); - // we expect 1 or 2 messages since we group all we get in using the same correlation key - result.expectedMinimumMessageCount(1); - - // then we sent all the message at once - template.sendBody("direct:start", "100"); - template.sendBody("direct:start", "150"); - template.sendBody("direct:start", "130"); - template.sendBody("direct:start", "200"); + template.sendBody("direct:start", "A"); + template.sendBody("direct:start", "B"); + template.sendBody("direct:start", "C"); + template.sendBody("direct:start", "D"); + template.sendBody("direct:start", "E"); + template.sendBody("direct:start", "F"); assertMockEndpointsSatisfied(); - - Exchange out = result.getExchanges().get(0); - List<Exchange> grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); - - assertTrue("Should be either 2 or 4, was " + grouped.size(), grouped.size() == 2 || grouped.size() == 4); - - assertEquals("100", grouped.get(0).getIn().getBody(String.class)); - assertEquals("150", grouped.get(1).getIn().getBody(String.class)); - - // wait a bit for the remainder to come in - Thread.sleep(1000); - - if (result.getReceivedCounter() == 2) { - - out = result.getExchanges().get(1); - grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class); - - assertEquals(2, grouped.size()); - - assertEquals("130", grouped.get(0).getIn().getBody(String.class)); - assertEquals("200", grouped.get(1).getIn().getBody(String.class)); - } - // END SNIPPET: e2 } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { public void configure() throws Exception { - // START SNIPPET: e1 - // our route is aggregating from the direct queue and sending the response to the mock from("direct:start") - .log("Aggregator received ${body}") - // aggregated all use same expression - .aggregate(constant(true)).completionSize(2) - // wait for 0.5 seconds to aggregate - .completionTimeout(500L) - // group the exchanges so we get one single exchange containing all the others + .aggregate(constant(true)).completionSize(3) .groupExchanges() .to("mock:result"); - // END SNIPPET: e1 } }; } Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateGroupedExchangeCompletionExpressionSizeTest.java (from r1098631, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateGroupedExchangeCompletionExpressionSizeTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateGroupedExchangeCompletionExpressionSizeTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java&r1=1098631&r2=1098685&rev=1098685&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregatorTest.java (original) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateGroupedExchangeCompletionExpressionSizeTest.java Mon May 2 17:39:08 2011 @@ -17,15 +17,16 @@ package org.apache.camel.spring.processor.aggregator; import org.apache.camel.CamelContext; -import org.apache.camel.processor.aggregator.AggregatorTest; +import org.apache.camel.processor.aggregator.AggregateGroupedExchangeCompletionExpressionSizeTest; + import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; /** * @version */ -public class SpringAggregatorTest extends AggregatorTest { +public class SpringAggregateGroupedExchangeCompletionExpressionSizeTest extends AggregateGroupedExchangeCompletionExpressionSizeTest { protected CamelContext createCamelContext() throws Exception { - return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator.xml"); + return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateGroupedExchangeCompletionExpressionSizeTest.xml"); } } \ No newline at end of file Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateGroupedExchangeCompletionExpressionSizeTest.xml (from r1098631, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateGroupedExchangeCompletionExpressionSizeTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateGroupedExchangeCompletionExpressionSizeTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml&r1=1098631&r2=1098685&rev=1098685&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateGroupedExchangeCompletionExpressionSizeTest.xml Mon May 2 17:39:08 2011 @@ -22,20 +22,19 @@ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> - <!-- START SNIPPET: e1 --> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> - <aggregate strategyRef="aggregatorStrategy" completionTimeout="3000"> + <aggregate groupExchanges="true"> <correlationExpression> - <simple>header.id</simple> + <constant>true</constant> </correlationExpression> - <to uri="mock:aggregated"/> + <completionSize> + <header>size</header> + </completionSize> + <to uri="mock:result"/> </aggregate> </route> </camelContext> - <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/> - <!-- END SNIPPET: e1 --> - </beans>