Author: davsclaus Date: Tue Sep 28 06:43:42 2010 New Revision: 1002036 URL: http://svn.apache.org/viewvc?rev=1002036&view=rev Log: CAMEL-3159: discard on timeout in Aggregate EIP. Added test with hawtdb as well.
Added: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java - copied, changed from r1002017, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1002036&r1=1002035&r2=1002036&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Tue Sep 28 06:43:42 2010 @@ -365,6 +365,10 @@ public class AggregateProcessor extends if (LOG.isDebugEnabled()) { LOG.debug("Aggregation for correlation key " + key + " discarding aggregated exchange: " + exchange); } + // must confirm the discarded exchange + aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId()); + // and remove redelivery state as well + redeliveryState.remove(exchange.getExchangeId()); return; } Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java (from r1002017, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java&r1=1002017&r2=1002036&rev=1002036&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateDiscardOnTimeoutTest.java Tue Sep 28 06:43:42 2010 @@ -20,13 +20,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.Exchange; -import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; -public class HawtDBAggregateRecoverTest extends CamelTestSupport { +public class HawtDBAggregateDiscardOnTimeoutTest extends CamelTestSupport { private static AtomicInteger counter = new AtomicInteger(0); private HawtDBAggregationRepository repo; @@ -43,22 +43,28 @@ public class HawtDBAggregateRecoverTest } @Test - public void testHawtDBAggregateRecover() throws Exception { - // should fail the first 2 times and then recover - getMockEndpoint("mock:aggregated").expectedMessageCount(3); - getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE"); - // should be marked as redelivered - getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE); - // on the 2nd redelivery attempt we success - getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2); + public void testAggregateDiscardOnTimeout() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.expectedMessageCount(0); + + template.sendBodyAndHeader("direct:start", "A", "id", 123); + template.sendBodyAndHeader("direct:start", "B", "id", 123); + + // wait 3 seconds + Thread.sleep(3000); + + mock.assertIsSatisfied(); + + // now send 3 which does not timeout + mock.reset(); + mock.expectedBodiesReceived("C+D+E"); template.sendBodyAndHeader("direct:start", "A", "id", 123); template.sendBodyAndHeader("direct:start", "B", "id", 123); template.sendBodyAndHeader("direct:start", "C", "id", 123); - template.sendBodyAndHeader("direct:start", "D", "id", 123); - template.sendBodyAndHeader("direct:start", "E", "id", 123); - assertMockEndpointsSatisfied(30, TimeUnit.SECONDS); + // should complete before timeout + mock.await(1500, TimeUnit.MILLISECONDS); } @Override @@ -68,21 +74,12 @@ public class HawtDBAggregateRecoverTest public void configure() throws Exception { from("direct:start") .aggregate(header("id"), new MyAggregationStrategy()) - .completionSize(5).aggregationRepository(repo) - .log("aggregated exchange id ${exchangeId} with ${body}") - .to("mock:aggregated") - .delay(1000) - // simulate errors the first two times - .process(new Processor() { - public void process(Exchange exchange) throws Exception { - int count = counter.incrementAndGet(); - if (count <= 2) { - throw new IllegalArgumentException("Damn"); - } - } - }) - .to("mock:result") - .end(); + .completionSize(3).aggregationRepository(repo) + // use a 3 second timeout + .completionTimeout(2000) + // and if timeout occurred then just discard the aggregated message + .discardOnCompletionTimeout() + .to("mock:aggregated"); } }; }