Author: davsclaus
Date: Sun Apr  4 08:58:43 2010
New Revision: 930647

URL: http://svn.apache.org/viewvc?rev=930647&view=rev
Log:
CAMEL-2568: added more tests.

Added:
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java
      - copied, changed from r930635, 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
    
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java?rev=930647&r1=930646&r2=930647&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
 Sun Apr  4 08:58:43 2010
@@ -56,6 +56,13 @@ public interface RecoverableAggregationR
     void setCheckInterval(long interval, TimeUnit timeUnit);
 
     /**
+     * Sets the interval between scans
+     *
+     * @param interval  the interval in millis
+     */
+    void setCheckInterval(long interval);
+
+    /**
      * Gets the interval between scans in millis.
      *
      * @return the interval in millis

Modified: 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=930647&r1=930646&r2=930647&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
 Sun Apr  4 08:58:43 2010
@@ -51,7 +51,7 @@ public class HawtDBAggregationRepository
     private boolean returnOldExchange;
     private HawtDBCamelMarshaller<K> marshaller = new 
HawtDBCamelMarshaller<K>();
     private long interval = 5000;
-    private boolean useRecovery = false;
+    private boolean useRecovery = true;
 
     /**
      * Creates an aggregation repository
@@ -348,6 +348,10 @@ public class HawtDBAggregationRepository
         this.interval = timeUnit.toMillis(interval);
     }
 
+    public void setCheckInterval(long interval) {
+        this.interval = interval;
+    }
+
     public long getCheckIntervalInMillis() {
         return interval;
     }

Copied: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java
 (from r930635, 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java&r1=930635&r2=930647&rev=930647&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadAndRecoverTest.java
 Sun Apr  4 08:58:43 2010
@@ -16,7 +16,12 @@
  */
 package org.apache.camel.component.hawtdb;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -24,9 +29,10 @@ import org.apache.camel.test.junit4.Came
 import org.junit.Before;
 import org.junit.Test;
 
-public class HawtDBAggregateLoadTest extends CamelTestSupport {
+public class HawtDBAggregateLoadAndRecoverTest extends CamelTestSupport {
 
-    private static final int SIZE = 5000;
+    private static final int SIZE = 1000;
+    private static final AtomicInteger counter = new AtomicInteger();
 
     @Before
     @Override
@@ -36,9 +42,9 @@ public class HawtDBAggregateLoadTest ext
     }
 
     @Test
-    public void testLoadTestHawtDBAggregate() throws Exception {
+    public void testLoadAndRecoverHawtDBAggregate() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMinimumMessageCount(1);
+        mock.expectedMessageCount(SIZE / 10);
         mock.setResultWaitTime(30 * 1000);
 
         System.out.println("Staring to send " + SIZE + " messages.");
@@ -46,12 +52,23 @@ public class HawtDBAggregateLoadTest ext
         for (int i = 0; i < SIZE; i++) {
             final int value = 1;
             char id = 'A';
-            template.sendBodyAndHeader("seda:start?size=" + SIZE, value, "id", 
"" + id);
+            Map headers = new HashMap();
+            headers.put("id", id);
+            headers.put("seq", i);
+            template.sendBodyAndHeaders("seda:start?size=" + SIZE, value, 
headers);
         }
 
         System.out.println("Sending all " + SIZE + " message done. Now waiting 
for aggregation to complete.");
 
         assertMockEndpointsSatisfied();
+
+        int recovered = 0;
+        for (Exchange exchange : mock.getReceivedExchanges()) {
+            if (exchange.getIn().getHeader(Exchange.REDELIVERED) != null) {
+                recovered++;
+            }
+        }
+        assertEquals("There should be 5 recovered", 5, recovered);
     }
 
     @Override
@@ -60,13 +77,23 @@ public class HawtDBAggregateLoadTest ext
             @Override
             public void configure() throws Exception {
                 HawtDBAggregationRepository<String> repo = new 
HawtDBAggregationRepository<String>("repo1", "target/data/hawtdb.dat");
+                repo.setUseRecovery(true);
 
                 from("seda:start?size=" + SIZE)
                     .to("log:input?groupSize=500")
                     .aggregate(header("id"), new MyAggregationStrategy())
                         .aggregationRepository(repo)
-                        .completionSize(SIZE)
+                        .completionSize(10)
                         .to("log:output?showHeaders=true")
+                        // have every 20th exchange fail which should then be 
recovered
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                int num = counter.incrementAndGet();
+                                if (num % 20 == 0) {
+                                    throw new IllegalStateException("Failed 
for num " + num);
+                                }
+                            }
+                        })
                         .to("mock:result")
                     .end();
             }

Modified: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java?rev=930647&r1=930646&r2=930647&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateLoadConcurrentTest.java
 Sun Apr  4 08:58:43 2010
@@ -43,7 +43,7 @@ public class HawtDBAggregateLoadConcurre
     @Test
     public void testLoadTestHawtDBAggregate() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMinimumMessageCount(10);
+        mock.expectedMinimumMessageCount(9);
         mock.setResultWaitTime(30 * 1000);
 
         ExecutorService executor = Executors.newFixedThreadPool(10);


Reply via email to