Author: davsclaus
Date: Sun Apr  4 08:10:59 2010
New Revision: 930642

URL: http://svn.apache.org/viewvc?rev=930642&view=rev
Log:
CAMEL-2568: Polished RecoverableAggregationRepository a bit and added more 
tests.

Added:
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryRecoverExistingTest.java
      - copied, changed from r930635, 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryLoadExistingTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
    
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=930642&r1=930641&r2=930642&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 Sun Apr  4 08:10:59 2010
@@ -480,41 +480,48 @@ public class AggregateProcessor extends 
         }
 
         public void run() {
-            AggregateProcessor.this.doRecover(recoverable);
-        }
-
-    }
-
-    private void doRecover(RecoverableAggregationRepository<Object> 
recoverable) {
-        LOG.trace("Starting recover check");
-
-        Set<String> exchangeIds = recoverable.scan(camelContext);
-        for (String exchangeId : exchangeIds) {
-
-            // we may shutdown while doing recovery
-            if (!isRunAllowed()) {
-                LOG.info("We are shutting down so stop recovering");
+            // only run if CamelContext has been fully started
+            if (!camelContext.getStatus().isStarted()) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Recover check cannot start due CamelContext(" + 
camelContext.getName() + ") has not been started yet");
+                }
                 return;
             }
 
-            boolean inProgress = 
inProgressCompleteExchanges.contains(exchangeId);
-            if (inProgress) {
-                LOG.debug("Aggregated exchange with id " + exchangeId + " is 
already in progress");
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Recovering aggregated exchange with id " + 
exchangeId);
+            LOG.trace("Starting recover check");
+
+            Set<String> exchangeIds = recoverable.scan(camelContext);
+            for (String exchangeId : exchangeIds) {
+
+                // we may shutdown while doing recovery
+                if (!isRunAllowed()) {
+                    LOG.info("We are shutting down so stop recovering");
+                    return;
                 }
-                Exchange exchange = recoverable.recover(camelContext, 
exchangeId);
-                if (exchange != null) {
-                    // get the correlation key
-                    String key = 
exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
-                    // resubmit the recovered exchange
-                    onSubmitCompletion(key, exchange);
+
+                boolean inProgress = 
inProgressCompleteExchanges.contains(exchangeId);
+                if (inProgress) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Aggregated exchange with id " + exchangeId 
+ " is already in progress.");
+                    }
+                } else {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Recovering aggregated exchange with id " + 
exchangeId);
+                    }
+                    Exchange exchange = recoverable.recover(camelContext, 
exchangeId);
+                    if (exchange != null) {
+                        // get the correlation key
+                        String key = 
exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
+                        // and mark it as redelivered
+                        exchange.getIn().setHeader(Exchange.REDELIVERED, 
Boolean.TRUE);
+                        // resubmit the recovered exchange
+                        onSubmitCompletion(key, exchange);
+                    }
                 }
             }
-        }
 
-        LOG.trace("Recover check complete");
+            LOG.trace("Recover check complete");
+        }
     }
 
     @Override
@@ -544,17 +551,15 @@ public class AggregateProcessor extends 
             RecoverableAggregationRepository<Object> recoverable = 
(RecoverableAggregationRepository<Object>) aggregationRepository;
             if (recoverable.isUseRecovery()) {
                 long interval = recoverable.getCheckIntervalInMillis();
-                if (interval > 0) {
-                    // create a background recover thread to check once ev
-                    recoverService = 
camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, 
"AggregateRecoverChecker", 1);
-                    Runnable recoverTask = new RecoverTask(recoverable);
-                    LOG.info("Scheduling recover checker to run every " + 
interval + " millis.");
-                    recoverService.scheduleAtFixedRate(recoverTask, 1000L, 
interval, TimeUnit.MILLISECONDS);
-                } else {
-                    // its a one shot recover during startup
-                    LOG.info("Running recover checker once at startup to 
recover existing aggregated exchanges");
-                    doRecover(recoverable);
+                if (interval <= 0) {
+                    throw new IllegalArgumentException("AggregationRepository 
has recovery enabled and the CheckInterval option must be a positive number, 
was: " + interval);
                 }
+
+                // create a background recover thread to check every interval
+                recoverService = 
camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, 
"AggregateRecoverChecker", 1);
+                Runnable recoverTask = new RecoverTask(recoverable);
+                LOG.info("Using RecoverableAggregationRepository by scheduling 
recover checker to run every " + interval + " millis.");
+                recoverService.scheduleAtFixedRate(recoverTask, 1000L, 
interval, TimeUnit.MILLISECONDS);
             }
         }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java?rev=930642&r1=930641&r2=930642&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java
 Sun Apr  4 08:10:59 2010
@@ -31,7 +31,7 @@ import org.apache.camel.Exchange;
 public interface RecoverableAggregationRepository<K> extends 
AggregationRepository<K> {
 
     /**
-     * Scans the repository for exchanges to be recovered
+     * Scans the repository for {...@link Exchange}s to be recovered
      * 
      * @param camelContext   the current CamelContext
      * @return the exchange ids for to be recovered

Modified: 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=930642&r1=930641&r2=930642&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
 Sun Apr  4 08:10:59 2010
@@ -252,8 +252,12 @@ public class HawtDBAggregationRepository
             }
         });
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Scanned and found " + answer.size() + " exchanges to 
recover.");
+        if (answer.size() == 0) {
+            LOG.trace("Scanned and found no exchange to recover.");
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Scanned and found " + answer.size() + " exchange(s) 
to recover (note some of them may already be in progress).");
+            }
         }
         return answer;
 
@@ -282,7 +286,7 @@ public class HawtDBAggregationRepository
         }
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Recovering exchangeId  [" + exchangeId + "] -> " + 
answer);
+            LOG.debug("Recovering exchangeId [" + exchangeId + "] -> " + 
answer);
         }
         return answer;
     }

Modified: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java?rev=930642&r1=930641&r2=930642&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java
 Sun Apr  4 08:10:59 2010
@@ -47,6 +47,7 @@ public class HawtDBAggregateRecoverTest 
         // should fail the first 2 times and then recover
         getMockEndpoint("mock:aggregated").expectedMessageCount(3);
         getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+        
getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);

Copied: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryRecoverExistingTest.java
 (from r930635, 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryLoadExistingTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryRecoverExistingTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryRecoverExistingTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryLoadExistingTest.java&r1=930635&r2=930642&rev=930642&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryLoadExistingTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryRecoverExistingTest.java
 Sun Apr  4 08:10:59 2010
@@ -23,7 +23,7 @@ import org.apache.camel.impl.DefaultExch
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
-public class HawtDBAggregationRepositoryLoadExistingTest extends 
CamelTestSupport {
+public class HawtDBAggregationRepositoryRecoverExistingTest extends 
CamelTestSupport {
 
     private HawtDBFile hawtDBFile;
 
@@ -49,6 +49,7 @@ public class HawtDBAggregationRepository
         repo.setHawtDBFile(hawtDBFile);
         repo.setRepositoryName("repo1");
         repo.setReturnOldExchange(true);
+        repo.setUseRecovery(true);
 
         // Store it..
         Exchange exchange1 = new DefaultExchange(context);
@@ -56,6 +57,11 @@ public class HawtDBAggregationRepository
         Exchange actual = repo.add(context, "foo", exchange1);
         assertEquals(null, actual);
 
+        // Remove it, which makes it in the pre confirm stage
+        repo.remove(context, "foo", exchange1);
+
+        String id = exchange1.getExchangeId();
+
         // stop the repo
         hawtDBFile.stop();
 
@@ -66,18 +72,12 @@ public class HawtDBAggregationRepository
 
         // Get it back..
         actual = repo.get(context, "foo");
-        assertEquals("counter:1", actual.getIn().getBody());
+        assertNull(actual);
 
-        // Change it..
-        Exchange exchange2 = new DefaultExchange(context);
-        exchange2.getIn().setBody("counter:2");
-        actual = repo.add(context, "foo", exchange2);
-        // the old one
+        // Recover it
+        actual = repo.recover(context, id);
+        assertNotNull(actual);
         assertEquals("counter:1", actual.getIn().getBody());
-
-        // Get it back..
-        actual = repo.get(context, "foo");
-        assertEquals("counter:2", actual.getIn().getBody());
     }
 
 }
\ No newline at end of file


Reply via email to