Author: davsclaus Date: Sun Apr 4 11:10:13 2010 New Revision: 930663 URL: http://svn.apache.org/viewvc?rev=930663&view=rev Log: CAMEL-2568: Added redelivery and redeliveryCounter as headers to recovered aggregated exchanges.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=930663&r1=930662&r2=930663&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java Sun Apr 4 11:10:13 2010 @@ -27,7 +27,9 @@ import org.apache.commons.logging.LogFac /** * A useful base class for any processor which provides some kind of throttling - * or delayed processing + * or delayed processing. + * <p/> + * This implementation will block while waiting. * * @version $Revision$ */ Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java?rev=930663&r1=930662&r2=930663&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Delayer.java Sun Apr 4 11:10:13 2010 @@ -24,7 +24,9 @@ import org.apache.camel.Processor; * A <a href="http://camel.apache.org/delayer.html">Delayer</a> which * delays processing the exchange until the correct amount of time has elapsed * using an expression to determine the delivery time. - * + * <p/> + * This implementation will block while waiting. + * * @version $Revision$ */ public class Delayer extends DelayProcessorSupport implements Traceable { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=930663&r1=930662&r2=930663&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Sun Apr 4 11:10:13 2010 @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -36,6 +37,7 @@ import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.ServiceSupport; +import org.apache.camel.impl.SynchronizationAdapter; import org.apache.camel.processor.Traceable; import org.apache.camel.spi.AggregationRepository; import org.apache.camel.spi.ExceptionHandler; @@ -80,7 +82,15 @@ public class AggregateProcessor extends private ExceptionHandler exceptionHandler; private AggregationRepository<Object> aggregationRepository = new MemoryAggregationRepository(); private Map<Object, Object> closedCorrelationKeys; + private final AggregateOnCompletion aggregateOnCompletion = new AggregateOnCompletion(); private final Set<String> inProgressCompleteExchanges = new HashSet<String>(); + private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>(); + + // keep booking about redelivery + private class RedeliveryData { + int redeliveryCounter; + long redeliveryDelay; + } // options private boolean ignoreBadCorrelationKeys; @@ -324,6 +334,10 @@ public class AggregateProcessor extends // send this exchange executorService.submit(new Runnable() { public void run() { + + // add on completion task so we remember to update the inProgressCompleteExchanges + exchange.addOnCompletion(aggregateOnCompletion); + try { processor.process(exchange); } catch (Exception e) { @@ -333,18 +347,13 @@ public class AggregateProcessor extends exchange.setException(new CamelExchangeException("Error processing aggregated exchange", exchange, t)); } - try { - // was it good or bad? - if (exchange.getException() == null) { - // only confirm if we processed without a problem - aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId()); - } else { - // if there was an exception then let the exception handler handle it - getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); - } - } finally { - // must remember to remove when we are done - inProgressCompleteExchanges.remove(exchange.getExchangeId()); + // was it good or bad? + if (exchange.getException() == null) { + // only confirm if we processed without a problem + aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId()); + } else { + // if there was an exception then let the exception handler handle it + getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); } } }); @@ -450,6 +459,29 @@ public class AggregateProcessor extends } /** + * On completion task which keeps the booking of the in progress up to date + */ + private class AggregateOnCompletion extends SynchronizationAdapter { + + @Override + public void onDone(Exchange exchange) { + // must remember to remove when we are done (done = success or failure) + inProgressCompleteExchanges.remove(exchange.getExchangeId()); + } + + @Override + public void onComplete(Exchange exchange) { + // remove redelivery state when it was processed successfully + redeliveryState.remove(exchange.getExchangeId()); + } + + @Override + public String toString() { + return "AggregateOnCompletion"; + } + } + + /** * Background task that looks for aggregated exchanges which is triggered by completion timeouts. */ private final class AggregationTimeoutMap extends DefaultTimeoutMap<Object, Exchange> { @@ -501,8 +533,8 @@ public class AggregateProcessor extends boolean inProgress = inProgressCompleteExchanges.contains(exchangeId); if (inProgress) { - if (LOG.isDebugEnabled()) { - LOG.debug("Aggregated exchange with id " + exchangeId + " is already in progress."); + if (LOG.isTraceEnabled()) { + LOG.trace("Aggregated exchange with id " + exchangeId + " is already in progress."); } } else { if (LOG.isDebugEnabled()) { @@ -514,6 +546,22 @@ public class AggregateProcessor extends String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class); // and mark it as redelivered exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE); + + // update current redelivery state + RedeliveryData data = redeliveryState.get(exchange.getExchangeId()); + if (data == null) { + // create new data + data = new RedeliveryData(); + redeliveryState.put(exchange.getExchangeId(), data); + } + data.redeliveryCounter++; + + // TODO: support delay and have a DelayQueue to avoid blocking + // if so we need to pre add in progress so we wont add again to delay queue + + // set redelivery counter + exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); + // resubmit the recovered exchange onSubmitCompletion(key, exchange); } @@ -550,7 +598,7 @@ public class AggregateProcessor extends if (aggregationRepository instanceof RecoverableAggregationRepository) { RecoverableAggregationRepository<Object> recoverable = (RecoverableAggregationRepository<Object>) aggregationRepository; if (recoverable.isUseRecovery()) { - long interval = recoverable.getCheckIntervalInMillis(); + long interval = recoverable.getRecoveryIntervalInMillis(); if (interval <= 0) { throw new IllegalArgumentException("AggregationRepository has recovery enabled and the CheckInterval option must be a positive number, was: " + interval); } @@ -574,14 +622,18 @@ public class AggregateProcessor extends @Override protected void doStop() throws Exception { - ServiceHelper.stopService(timeoutMap); - ServiceHelper.stopService(recoverService); - - ServiceHelper.stopService(aggregationRepository); + ServiceHelper.stopServices(timeoutMap, recoverService, aggregationRepository); if (closedCorrelationKeys != null) { closedCorrelationKeys.clear(); } + redeliveryState.clear(); + } + + @Override + protected void doShutdown() throws Exception { + // cleanup when shutting down + inProgressCompleteExchanges.clear(); } } \ No newline at end of file Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java?rev=930663&r1=930662&r2=930663&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RecoverableAggregationRepository.java Sun Apr 4 11:10:13 2010 @@ -48,26 +48,26 @@ public interface RecoverableAggregationR Exchange recover(CamelContext camelContext, String exchangeId); /** - * Sets the interval between scans + * Sets the interval between recovery scans * * @param interval the interval * @param timeUnit the time unit */ - void setCheckInterval(long interval, TimeUnit timeUnit); + void setRecoveryInterval(long interval, TimeUnit timeUnit); /** - * Sets the interval between scans + * Sets the interval between recovery scans * * @param interval the interval in millis */ - void setCheckInterval(long interval); + void setRecoveryInterval(long interval); /** - * Gets the interval between scans in millis. + * Gets the interval between recovery scans in millis. * * @return the interval in millis */ - long getCheckIntervalInMillis(); + long getRecoveryIntervalInMillis(); /** * Whether or not recovery is enabled or not Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=930663&r1=930662&r2=930663&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original) +++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Sun Apr 4 11:10:13 2010 @@ -50,7 +50,7 @@ public class HawtDBAggregationRepository private boolean sync; private boolean returnOldExchange; private HawtDBCamelMarshaller<K> marshaller = new HawtDBCamelMarshaller<K>(); - private long interval = 5000; + private long recoveryInterval = 5000; private boolean useRecovery = true; /** @@ -344,16 +344,16 @@ public class HawtDBAggregationRepository this.returnOldExchange = returnOldExchange; } - public void setCheckInterval(long interval, TimeUnit timeUnit) { - this.interval = timeUnit.toMillis(interval); + public void setRecoveryInterval(long interval, TimeUnit timeUnit) { + this.recoveryInterval = timeUnit.toMillis(interval); } - public void setCheckInterval(long interval) { - this.interval = interval; + public void setRecoveryInterval(long interval) { + this.recoveryInterval = interval; } - public long getCheckIntervalInMillis() { - return interval; + public long getRecoveryIntervalInMillis() { + return recoveryInterval; } public boolean isUseRecovery() { Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java?rev=930663&r1=930662&r2=930663&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentDifferentGroupsTest.java Sun Apr 4 11:10:13 2010 @@ -70,7 +70,7 @@ public class HawtDBAggregateConcurrentDi private void doSendMessages(int files, int poolSize) throws Exception { MockEndpoint mock = getMockEndpoint("mock:aggregated"); mock.expectedMessageCount(2); - mock.setResultWaitTime(20 * 1000L); + mock.setResultWaitTime(30 * 1000L); ExecutorService executor = Executors.newFixedThreadPool(poolSize); for (int i = 0; i < files; i++) { Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java?rev=930663&r1=930662&r2=930663&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateConcurrentSameGroupTest.java Sun Apr 4 11:10:13 2010 @@ -69,7 +69,7 @@ public class HawtDBAggregateConcurrentSa private void doSendMessages(int files, int poolSize) throws Exception { MockEndpoint mock = getMockEndpoint("mock:aggregated"); - mock.setResultWaitTime(20 * 1000L); + mock.setResultWaitTime(30 * 1000L); mock.expectedMessageCount(1); // match number of expected numbers mock.message(0).body(String.class).regex("[0-9]{" + files + "}"); Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java?rev=930663&r1=930662&r2=930663&view=diff ============================================================================== --- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java (original) +++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java Sun Apr 4 11:10:13 2010 @@ -38,7 +38,7 @@ public class HawtDBAggregateRecoverTest // enable recovery repo.setUseRecovery(true); // check faster - repo.setCheckInterval(1, TimeUnit.SECONDS); + repo.setRecoveryInterval(1, TimeUnit.SECONDS); super.setUp(); } @@ -47,7 +47,10 @@ public class HawtDBAggregateRecoverTest // should fail the first 2 times and then recover getMockEndpoint("mock:aggregated").expectedMessageCount(3); getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE"); + // should be marked as redelivered getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE); + // on the 2nd redelivery attempt we success + getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2); template.sendBodyAndHeader("direct:start", "A", "id", 123); template.sendBodyAndHeader("direct:start", "B", "id", 123);