Author: davsclaus Date: Sun Apr 4 08:10:59 2010 New Revision: 930642 URL: http://svn.apache.org/viewvc?rev=930642&view=rev Log: CAMEL-2568: Polished RecoverableAggregationRepository a bit and added more tests.
Added: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryRecoverExistingTest.java - copied, changed from r930635, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryLoadExistingTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=930642&r1=930641&r2=930642&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sun Apr 4 08:10:59 2010 @@ -480,41 +480,48 @@ public class AggregateProcessor extends } public void run() { - AggregateProcessor.this.doRecover(recoverable); - } - - } - - private void doRecover(RecoverableAggregationRepository<Object> recoverable) { - LOG.trace("Starting recover check"); - - Set<String> exchangeIds = recoverable.scan(camelContext); - for (String exchangeId : exchangeIds) { - - // we may shutdown while doing recovery - if (!isRunAllowed()) { - LOG.info("We are shutting down so stop recovering"); + // only run if CamelContext has been fully started + if (!camelContext.getStatus().isStarted()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Recover check cannot start due CamelContext(" + camelContext.getName() + ") has not been started yet"); + } return; } - boolean inProgress = inProgressCompleteExchanges.contains(exchangeId); - if (inProgress) { - LOG.debug("Aggregated exchange with id " + exchangeId + " is already in progress"); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Recovering aggregated exchange with id " + exchangeId); + LOG.trace("Starting recover check"); + + Set<String> exchangeIds = recoverable.scan(camelContext); + for (String exchangeId : exchangeIds) { + + // we may shutdown while doing recovery + if (!isRunAllowed()) { + LOG.info("We are shutting down so stop recovering"); + return; } - Exchange exchange = recoverable.recover(camelContext, exchangeId); - if (exchange != null) { - // get the correlation key - String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class); - // resubmit the recovered exchange - onSubmitCompletion(key, exchange); + + boolean inProgress = inProgressCompleteExchanges.contains(exchangeId); + if (inProgress) { + if (LOG.isDebugEnabled()) { + LOG.debug("Aggregated exchange with id " + exchangeId + " is already in progress."); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Recovering aggregated exchange with id " + exchangeId); + } + Exchange exchange = recoverable.recover(camelContext, exchangeId); + if (exchange != null) { + // get the correlation key + String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class); + // and mark it as redelivered + exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE); + // resubmit the recovered exchange + onSubmitCompletion(key, exchange); + } } } - } - LOG.trace("Recover check complete"); + LOG.trace("Recover check complete"); + } } @Override @@ -544,17 +551,15 @@ public class AggregateProcessor extends RecoverableAggregationRepository<Object> recoverable = (RecoverableAggregationRepository<Object>) aggregationRepository; if (recoverable.isUseRecovery()) { long interval = recoverable.getCheckIntervalInMillis(); - if (interval > 0) { - // create a background recover thread to check once ev - recoverService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateRecoverChecker", 1); - Runnable recoverTask = new RecoverTask(recoverable); - LOG.info("Scheduling recover checker to run every " + interval + " millis."); - recoverService.scheduleAtFixedRate(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS); - } else { - // its a one shot recover during startup - LOG.info("Running recover checker once at startup to recover existing aggregated exchanges"); - doRecover(recoverable); + if (interval <= 0) { + throw new IllegalArgumentException("AggregationRepository has recovery enabled and the CheckInterval option must be a positive number, was: " + interval); } + + // create a background recover thread to check every interval + recoverService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateRecoverChecker", 1); + Runnable recoverTask = new RecoverTask(recoverable); + LOG.info("Using RecoverableAggregationRepository by scheduling recover checker to run every " + interval + " millis."); + recoverService.scheduleAtFixedRate(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS); } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java?rev=930642&r1=930641&r2=930642&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java Sun Apr 4 08:10:59 2010 @@ -31,7 +31,7 @@ import org.apache.camel.Exchange; public interface RecoverableAggregationRepository<K> extends AggregationRepository<K> { /** - * Scans the repository for exchanges to be recovered + * Scans the repository for {...@link Exchange}s to be recovered * * @param camelContext the current CamelContext * @return the exchange ids for to be recovered Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=930642&r1=930641&r2=930642&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original) +++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Sun Apr 4 08:10:59 2010 @@ -252,8 +252,12 @@ public class HawtDBAggregationRepository } }); - if (LOG.isDebugEnabled()) { - LOG.debug("Scanned and found " + answer.size() + " exchanges to recover."); + if (answer.size() == 0) { + LOG.trace("Scanned and found no exchange to recover."); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Scanned and found " + answer.size() + " exchange(s) to recover (note some of them may already be in progress)."); + } } return answer; @@ -282,7 +286,7 @@ public class HawtDBAggregationRepository } if (LOG.isDebugEnabled()) { - LOG.debug("Recovering exchangeId [" + exchangeId + "] -> " + answer); + LOG.debug("Recovering exchangeId [" + exchangeId + "] -> " + answer); } return answer; } Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java?rev=930642&r1=930641&r2=930642&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java Sun Apr 4 08:10:59 2010 @@ -47,6 +47,7 @@ public class HawtDBAggregateRecoverTest // should fail the first 2 times and then recover getMockEndpoint("mock:aggregated").expectedMessageCount(3); getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE"); + getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE); template.sendBodyAndHeader("direct:start", "A", "id", 123); template.sendBodyAndHeader("direct:start", "B", "id", 123); Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryRecoverExistingTest.java (from r930635, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryLoadExistingTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryRecoverExistingTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryRecoverExistingTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryLoadExistingTest.java&r1=930635&r2=930642&rev=930642&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryLoadExistingTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepositoryRecoverExistingTest.java Sun Apr 4 08:10:59 2010 @@ -23,7 +23,7 @@ import org.apache.camel.impl.DefaultExch import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; -public class HawtDBAggregationRepositoryLoadExistingTest extends CamelTestSupport { +public class HawtDBAggregationRepositoryRecoverExistingTest extends CamelTestSupport { private HawtDBFile hawtDBFile; @@ -49,6 +49,7 @@ public class HawtDBAggregationRepository repo.setHawtDBFile(hawtDBFile); repo.setRepositoryName("repo1"); repo.setReturnOldExchange(true); + repo.setUseRecovery(true); // Store it.. Exchange exchange1 = new DefaultExchange(context); @@ -56,6 +57,11 @@ public class HawtDBAggregationRepository Exchange actual = repo.add(context, "foo", exchange1); assertEquals(null, actual); + // Remove it, which makes it in the pre confirm stage + repo.remove(context, "foo", exchange1); + + String id = exchange1.getExchangeId(); + // stop the repo hawtDBFile.stop(); @@ -66,18 +72,12 @@ public class HawtDBAggregationRepository // Get it back.. actual = repo.get(context, "foo"); - assertEquals("counter:1", actual.getIn().getBody()); + assertNull(actual); - // Change it.. - Exchange exchange2 = new DefaultExchange(context); - exchange2.getIn().setBody("counter:2"); - actual = repo.add(context, "foo", exchange2); - // the old one + // Recover it + actual = repo.recover(context, id); + assertNotNull(actual); assertEquals("counter:1", actual.getIn().getBody()); - - // Get it back.. - actual = repo.get(context, "foo"); - assertEquals("counter:2", actual.getIn().getBody()); } } \ No newline at end of file