Author: davsclaus Date: Sun Apr 4 08:58:43 2010 New Revision: 930647 URL: http://svn.apache.org/viewvc?rev=930647&view=rev Log: CAMEL-2568: added more tests.
Added: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java - copied, changed from r930635, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java?rev=930647&r1=930646&r2=930647&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java Sun Apr 4 08:58:43 2010 @@ -56,6 +56,13 @@ public interface RecoverableAggregationR void setCheckInterval(long interval, TimeUnit timeUnit); /** + * Sets the interval between scans + * + * @param interval the interval in millis + */ + void setCheckInterval(long interval); + + /** * Gets the interval between scans in millis. * * @return the interval in millis Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=930647&r1=930646&r2=930647&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original) +++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Sun Apr 4 08:58:43 2010 @@ -51,7 +51,7 @@ public class HawtDBAggregationRepository private boolean returnOldExchange; private HawtDBCamelMarshaller<K> marshaller = new HawtDBCamelMarshaller<K>(); private long interval = 5000; - private boolean useRecovery = false; + private boolean useRecovery = true; /** * Creates an aggregation repository @@ -348,6 +348,10 @@ public class HawtDBAggregationRepository this.interval = timeUnit.toMillis(interval); } + public void setCheckInterval(long interval) { + this.interval = interval; + } + public long getCheckIntervalInMillis() { return interval; } Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java (from r930635, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java&r1=930635&r2=930647&rev=930647&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java Sun Apr 4 08:58:43 2010 @@ -16,7 +16,12 @@ */ package org.apache.camel.component.hawtdb; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.aggregate.AggregationStrategy; @@ -24,9 +29,10 @@ import org.apache.camel.test.junit4.Came import org.junit.Before; import org.junit.Test; -public class HawtDBAggregateLoadTest extends CamelTestSupport { +public class HawtDBAggregateLoadAndRecoverTest extends CamelTestSupport { - private static final int SIZE = 5000; + private static final int SIZE = 1000; + private static final AtomicInteger counter = new AtomicInteger(); @Before @Override @@ -36,9 +42,9 @@ public class HawtDBAggregateLoadTest ext } @Test - public void testLoadTestHawtDBAggregate() throws Exception { + public void testLoadAndRecoverHawtDBAggregate() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedMinimumMessageCount(1); + mock.expectedMessageCount(SIZE / 10); mock.setResultWaitTime(30 * 1000); System.out.println("Staring to send " + SIZE + " messages."); @@ -46,12 +52,23 @@ public class HawtDBAggregateLoadTest ext for (int i = 0; i < SIZE; i++) { final int value = 1; char id = 'A'; - template.sendBodyAndHeader("seda:start?size=" + SIZE, value, "id", "" + id); + Map headers = new HashMap(); + headers.put("id", id); + headers.put("seq", i); + template.sendBodyAndHeaders("seda:start?size=" + SIZE, value, headers); } System.out.println("Sending all " + SIZE + " message done. Now waiting for aggregation to complete."); assertMockEndpointsSatisfied(); + + int recovered = 0; + for (Exchange exchange : mock.getReceivedExchanges()) { + if (exchange.getIn().getHeader(Exchange.REDELIVERED) != null) { + recovered++; + } + } + assertEquals("There should be 5 recovered", 5, recovered); } @Override @@ -60,13 +77,23 @@ public class HawtDBAggregateLoadTest ext @Override public void configure() throws Exception { HawtDBAggregationRepository<String> repo = new HawtDBAggregationRepository<String>("repo1", "target/data/hawtdb.dat"); + repo.setUseRecovery(true); from("seda:start?size=" + SIZE) .to("log:input?groupSize=500") .aggregate(header("id"), new MyAggregationStrategy()) .aggregationRepository(repo) - .completionSize(SIZE) + .completionSize(10) .to("log:output?showHeaders=true") + // have every 20th exchange fail which should then be recovered + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + int num = counter.incrementAndGet(); + if (num % 20 == 0) { + throw new IllegalStateException("Failed for num " + num); + } + } + }) .to("mock:result") .end(); } Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java?rev=930647&r1=930646&r2=930647&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java Sun Apr 4 08:58:43 2010 @@ -43,7 +43,7 @@ public class HawtDBAggregateLoadConcurre @Test public void testLoadTestHawtDBAggregate() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedMinimumMessageCount(10); + mock.expectedMinimumMessageCount(9); mock.setResultWaitTime(30 * 1000); ExecutorService executor = Executors.newFixedThreadPool(10);