Author: davsclaus
Date: Sun Apr  4 13:47:42 2010
New Revision: 930692

URL: http://svn.apache.org/viewvc?rev=930692&view=rev
Log:
CAMEL-2568: Preparing for redelivery policy for aggregator.

Added:
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java
      - copied, changed from r930663, 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java
   (with props)
    
camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml
      - copied, changed from r930635, 
camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateTest.xml
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyUsingOnlyWhenTest.java
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java
    
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java?rev=930692&r1=930691&r2=930692&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java
 Sun Apr  4 13:47:42 2010
@@ -387,7 +387,7 @@ public class DefaultErrorHandlerBuilder 
     protected RedeliveryPolicy createRedeliveryPolicy() {
         RedeliveryPolicy policy = new RedeliveryPolicy();
         policy.disableRedelivery();
-        policy.setRedeliverDelay(0);
+        policy.setRedeliveryDelay(0);
         return policy;
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java?rev=930692&r1=930691&r2=930692&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
 Sun Apr  4 13:47:42 2010
@@ -102,7 +102,7 @@ public class OnExceptionDefinition exten
     public String toString() {
         return "OnException[" + getExceptionClasses() + (onWhen != null ? " " 
+ onWhen : "") + " -> " + getOutputs() + "]";
     }
-    
+
     @Override
     public boolean isAbstract() {
         return true;
@@ -110,10 +110,11 @@ public class OnExceptionDefinition exten
 
     /**
      * Allows an exception handler to create a new redelivery policy for this 
exception type
-     * @param context the camel context
+     *
+     * @param context      the camel context
      * @param parentPolicy the current redelivery policy
      * @return a newly created redelivery policy, or return the original 
policy if no customization is required
-     * for this exception handler.
+     *         for this exception handler.
      */
     public RedeliveryPolicy createRedeliveryPolicy(CamelContext context, 
RedeliveryPolicy parentPolicy) {
         if (redeliveryPolicy != null) {
@@ -171,29 +172,29 @@ public class OnExceptionDefinition exten
     /**
      * Sets whether the exchange should be marked as handled or not.
      *
-     * @param handled  handled or not
+     * @param handled handled or not
      * @return the builder
      */
     public OnExceptionDefinition handled(boolean handled) {
         Expression expression = 
ExpressionBuilder.constantExpression(Boolean.toString(handled));
         return handled(expression);
     }
-    
+
     /**
      * Sets whether the exchange should be marked as handled or not.
      *
-     * @param handled  predicate that determines true or false
+     * @param handled predicate that determines true or false
      * @return the builder
      */
     public OnExceptionDefinition handled(Predicate handled) {
         setHandledPolicy(handled);
         return this;
     }
-    
+
     /**
      * Sets whether the exchange should be marked as handled or not.
      *
-     * @param handled  expression that determines true or false
+     * @param handled expression that determines true or false
      * @return the builder
      */
     public OnExceptionDefinition handled(Expression handled) {
@@ -207,7 +208,7 @@ public class OnExceptionDefinition exten
      * To be used for fine grained controlling whether a thrown exception 
should be intercepted
      * by this exception type or not.
      *
-     * @param predicate  predicate that determines true or false
+     * @param predicate predicate that determines true or false
      * @return the builder
      */
     public OnExceptionDefinition onWhen(Predicate predicate) {
@@ -254,11 +255,13 @@ public class OnExceptionDefinition exten
     }
 
     /**
-     * Sets the delay
+     * Sets the initial redelivery delay
      *
-     * @param delay  the redeliver delay
+     * @param delay the initial redelivery delay
      * @return the builder
+     * @deprecated
      */
+    @Deprecated
     public OnExceptionDefinition redeliverDelay(long delay) {
         getOrCreateRedeliveryPolicy().redeliveryDelay(delay);
         return this;
@@ -267,7 +270,7 @@ public class OnExceptionDefinition exten
     /**
      * Sets the back off multiplier
      *
-     * @param backOffMultiplier  the back off multiplier
+     * @param backOffMultiplier the back off multiplier
      * @return the builder
      */
     public OnExceptionDefinition backOffMultiplier(double backOffMultiplier) {
@@ -278,7 +281,7 @@ public class OnExceptionDefinition exten
     /**
      * Sets the collision avoidance factor
      *
-     * @param collisionAvoidanceFactor  the factor
+     * @param collisionAvoidanceFactor the factor
      * @return the builder
      */
     public OnExceptionDefinition collisionAvoidanceFactor(double 
collisionAvoidanceFactor) {
@@ -289,7 +292,7 @@ public class OnExceptionDefinition exten
     /**
      * Sets the collision avoidance percentage
      *
-     * @param collisionAvoidancePercent  the percentage
+     * @param collisionAvoidancePercent the percentage
      * @return the builder
      */
     public OnExceptionDefinition collisionAvoidancePercent(double 
collisionAvoidancePercent) {
@@ -298,9 +301,9 @@ public class OnExceptionDefinition exten
     }
 
     /**
-     * Sets the fixed delay between redeliveries
+     * Sets the initial redelivery delay
      *
-     * @param delay  delay in millis
+     * @param delay delay in millis
      * @return the builder
      */
     public OnExceptionDefinition redeliveryDelay(long delay) {
@@ -311,7 +314,7 @@ public class OnExceptionDefinition exten
     /**
      * Sets the logging level to use when retries has exhausted
      *
-     * @param retriesExhaustedLogLevel  the logging level
+     * @param retriesExhaustedLogLevel the logging level
      * @return the builder
      */
     public OnExceptionDefinition retriesExhaustedLogLevel(LoggingLevel 
retriesExhaustedLogLevel) {
@@ -322,7 +325,7 @@ public class OnExceptionDefinition exten
     /**
      * Sets the logging level to use for logging retry attempts
      *
-     * @param retryAttemptedLogLevel  the logging level
+     * @param retryAttemptedLogLevel the logging level
      * @return the builder
      */
     public OnExceptionDefinition retryAttemptedLogLevel(LoggingLevel 
retryAttemptedLogLevel) {
@@ -373,12 +376,12 @@ public class OnExceptionDefinition exten
     /**
      * Sets the maximum redeliveries
      * <ul>
-     *   <li>5 = default value</li>
-     *   <li>0 = no redeliveries</li>
-     *   <li>-1 = redeliver forever</li>
+     * <li>5 = default value</li>
+     * <li>0 = no redeliveries</li>
+     * <li>-1 = redeliver forever</li>
      * </ul>
      *
-     * @param maximumRedeliveries  the value
+     * @param maximumRedeliveries the value
      * @return the builder
      */
     public OnExceptionDefinition maximumRedeliveries(int maximumRedeliveries) {
@@ -409,7 +412,7 @@ public class OnExceptionDefinition exten
     /**
      * Sets the maximum delay between redelivery
      *
-     * @param maximumRedeliveryDelay  the delay in millis
+     * @param maximumRedeliveryDelay the delay in millis
      * @return the builder
      */
     public OnExceptionDefinition maximumRedeliveryDelay(long 
maximumRedeliveryDelay) {
@@ -418,6 +421,28 @@ public class OnExceptionDefinition exten
     }
 
     /**
+     * Sets a reference to a {...@link RedeliveryPolicy} to lookup in the 
{...@link org.apache.camel.spi.Registry} to be used.
+     *
+     * @param redeliveryPolicyRef reference to use for lookup
+     * @return the builder
+     */
+    public OnExceptionDefinition redeliveryPolicyRef(String 
redeliveryPolicyRef) {
+        getOrCreateRedeliveryPolicy().setRef(redeliveryPolicyRef);
+        return this;
+    }
+
+    /**
+     * Sets the delay pattern with delay intervals.
+     *
+     * @param delayPattern the delay pattern
+     * @return the builder
+     */
+    public OnExceptionDefinition delayPattern(String delayPattern) {
+        getOrCreateRedeliveryPolicy().setDelayPattern(delayPattern);
+        return this;
+    }
+
+    /**
      * Will use the original input body when an {...@link 
org.apache.camel.Exchange} is moved to the dead letter queue.
      * <p/>
      * <b>Notice:</b> this only applies when all redeliveries attempt have 
failed and the {...@link org.apache.camel.Exchange} is doomed for failure.
@@ -438,7 +463,7 @@ public class OnExceptionDefinition exten
     }
 
     /**
-     * Sets a processor that should be processed <b>before</b> a redelivey 
attempt.
+     * Sets a processor that should be processed <b>before</b> a redelivery 
attempt.
      * <p/>
      * Can be used to change the {...@link org.apache.camel.Exchange} 
<b>before</b> its being redelivered.
      */
@@ -449,6 +474,7 @@ public class OnExceptionDefinition exten
 
     // Properties
     //-------------------------------------------------------------------------
+
     public List<ProcessorDefinition> getOutputs() {
         return outputs;
     }
@@ -498,7 +524,7 @@ public class OnExceptionDefinition exten
 
     public ExpressionSubElementDefinition getHandled() {
         return handled;
-    }    
+    }
 
     public void setHandledPolicy(Predicate handledPolicy) {
         this.handledPolicy = handledPolicy;
@@ -554,6 +580,7 @@ public class OnExceptionDefinition exten
 
     // Implementation methods
     //-------------------------------------------------------------------------
+
     protected RedeliveryPolicyDefinition getOrCreateRedeliveryPolicy() {
         if (redeliveryPolicy == null) {
             redeliveryPolicy = new RedeliveryPolicyDefinition();

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java?rev=930692&r1=930691&r2=930692&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RedeliveryPolicyDefinition.java
 Sun Apr  4 13:47:42 2010
@@ -66,6 +66,8 @@ public class RedeliveryPolicyDefinition 
     private Boolean logExhausted;
     @XmlAttribute
     private Boolean disableRedelivery;
+    @XmlAttribute
+    private String delayPattern;
 
     public RedeliveryPolicy createRedeliveryPolicy(CamelContext context, 
RedeliveryPolicy parentPolicy) {
         if (ref != null) {
@@ -73,14 +75,19 @@ public class RedeliveryPolicyDefinition 
             return CamelContextHelper.mandatoryLookup(context, ref, 
RedeliveryPolicy.class);
         }
 
-        RedeliveryPolicy answer = parentPolicy.copy();
+        RedeliveryPolicy answer;
+        if (parentPolicy != null) {
+            answer = parentPolicy.copy();
+        } else {
+            answer = new RedeliveryPolicy();
+        }
 
         // copy across the properties - if they are set
         if (maximumRedeliveries != null) {
             answer.setMaximumRedeliveries(maximumRedeliveries);
         }
         if (redeliveryDelay != null) {
-            answer.setRedeliverDelay(redeliveryDelay);
+            answer.setRedeliveryDelay(redeliveryDelay);
         }
         if (retriesExhaustedLogLevel != null) {
             answer.setRetriesExhaustedLogLevel(retriesExhaustedLogLevel);
@@ -121,6 +128,9 @@ public class RedeliveryPolicyDefinition 
         if (disableRedelivery != null && disableRedelivery) {
             answer.setMaximumRedeliveries(0);
         }
+        if (delayPattern != null) {
+            answer.setDelayPattern(delayPattern);
+        }
 
         return answer;
     }
@@ -260,7 +270,7 @@ public class RedeliveryPolicyDefinition 
     /**
      * Sets the maximum redeliveries
      * <ul>
-     *   <li>5 = default value</li>
+     *   <li>x = redeliver at most x times</li>
      *   <li>0 = no redeliveries</li>
      *   <li>-1 = redeliver forever</li>
      * </ul>
@@ -315,6 +325,17 @@ public class RedeliveryPolicyDefinition 
         return this;
     }
 
+    /**
+     * Sets the delay pattern with delay intervals.
+     *
+     * @param delayPattern the delay pattern
+     * @return the builder
+     */
+    public RedeliveryPolicyDefinition delayPattern(String delayPattern) {
+        setDelayPattern(delayPattern);
+        return this;
+    }
+
     // Properties
     //-------------------------------------------------------------------------
 
@@ -445,4 +466,12 @@ public class RedeliveryPolicyDefinition 
     public void setLogExhausted(Boolean logExhausted) {
         this.logExhausted = logExhausted;
     }
+
+    public String getDelayPattern() {
+        return delayPattern;
+    }
+
+    public void setDelayPattern(String delayPattern) {
+        this.delayPattern = delayPattern;
+    }
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?rev=930692&r1=930691&r2=930692&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
 Sun Apr  4 13:47:42 2010
@@ -149,7 +149,9 @@ public class RedeliveryPolicy implements
 
 
     /**
-     * Calculates the new redelivery delay based on the last one then sleeps 
for the necessary amount of time
+     * Calculates the new redelivery delay based on the last one and then 
<b>sleeps</b> for the necessary amount of time.
+     * <p/>
+     * This implementation will block while sleeping.
      *
      * @param redeliveryDelay  previous redelivery delay
      * @param redeliveryCounter  number of previous redelivery attempts
@@ -168,7 +170,14 @@ public class RedeliveryPolicy implements
         return redeliveryDelay;
     }
 
-    protected long calculateRedeliveryDelay(long previousDelay, int 
redeliveryCounter) {
+    /**
+     * Calculates the new redelivery delay based on the last one
+     *
+     * @param previousDelay  previous redelivery delay
+     * @param redeliveryCounter  number of previous redelivery attempts
+     * @return the calculate delay
+     */
+    public long calculateRedeliveryDelay(long previousDelay, int 
redeliveryCounter) {
         if (ObjectHelper.isNotEmpty(delayPattern)) {
             // calculate delay using the pattern
             return calculateRedeliverDelayUsingPattern(delayPattern, 
redeliveryCounter);

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=930692&r1=930691&r2=930692&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 Sun Apr  4 13:47:42 2010
@@ -37,11 +37,12 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
-import org.apache.camel.impl.SynchronizationAdapter;
+import org.apache.camel.processor.RedeliveryPolicy;
 import org.apache.camel.processor.Traceable;
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.RecoverableAggregationRepository;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.DefaultTimeoutMap;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.LRUCache;
@@ -328,7 +329,7 @@ public class AggregateProcessor extends 
             LOG.debug("Aggregation complete for correlation key " + key + " 
sending aggregated exchange: " + exchange);
         }
 
-        // add this as in progress
+        // add this as in progress before we submit the task
         inProgressCompleteExchanges.add(exchange.getExchangeId());
 
         // send this exchange
@@ -461,17 +462,18 @@ public class AggregateProcessor extends 
     /**
      * On completion task which keeps the booking of the in progress up to date
      */
-    private class AggregateOnCompletion extends SynchronizationAdapter {
+    private class AggregateOnCompletion implements Synchronization {
 
-        @Override
-        public void onDone(Exchange exchange) {
-            // must remember to remove when we are done (done = success or 
failure)
+        public void onFailure(Exchange exchange) {
+            // must remember to remove in progress when we failed
             inProgressCompleteExchanges.remove(exchange.getExchangeId());
+            // do not remove redelivery state as we need it when we redeliver 
again later
         }
 
-        @Override
         public void onComplete(Exchange exchange) {
-            // remove redelivery state when it was processed successfully
+            // must remember to remove in progress when we are complete
+            inProgressCompleteExchanges.remove(exchange.getExchangeId());
+            // and remove redelivery state as well
             redeliveryState.remove(exchange.getExchangeId());
         }
 
@@ -556,9 +558,6 @@ public class AggregateProcessor extends 
                         }
                         data.redeliveryCounter++;
 
-                        // TODO: support delay and have a DelayQueue to avoid 
blocking
-                        // if so we need to pre add in progress so we wont add 
again to delay queue
-
                         // set redelivery counter
                         
exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
 

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyUsingOnlyWhenTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyUsingOnlyWhenTest.java?rev=930692&r1=930691&r2=930692&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyUsingOnlyWhenTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyUsingOnlyWhenTest.java
 Sun Apr  4 13:47:42 2010
@@ -68,7 +68,7 @@ public class DefaultExceptionPolicyStrat
                 
errorHandler(deadLetterChannel(ERROR_QUEUE).maximumRedeliveries(0).redeliveryDelay(100));
 
                 
onException(MyUserException.class).onWhen(header("user").isNotNull())
-                    
.maximumRedeliveries(1).backOffMultiplier(2).redeliverDelay(0)
+                    
.maximumRedeliveries(1).backOffMultiplier(2).redeliveryDelay(0)
                     .to(ERROR_USER_QUEUE);
 
                 from("direct:a").process(new Processor() {

Modified: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java?rev=930692&r1=930691&r2=930692&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java
 Sun Apr  4 13:47:42 2010
@@ -48,6 +48,9 @@ public class HawtDBAggregateNotLostRemov
 
         assertMockEndpointsSatisfied();
 
+        // sleep a bit since the completed signal is done async
+        Thread.sleep(2000);
+
         String exchangeId = 
getMockEndpoint("mock:result").getReceivedExchanges().get(0).getExchangeId();
 
         // the exchange should NOT be in the completed repo as it was confirmed

Modified: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java?rev=930692&r1=930691&r2=930692&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java
 Sun Apr  4 13:47:42 2010
@@ -49,6 +49,9 @@ public class HawtDBAggregateNotLostTest 
 
         assertMockEndpointsSatisfied();
 
+        // sleep a bit since the completed signal is done async
+        Thread.sleep(2000);
+
         String exchangeId = 
getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId();
 
         // the exchange should be in the completed repo where we should be 
able to find it

Copied: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java
 (from r930663, 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java&r1=930663&r2=930692&rev=930692&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverTest.java
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateRecoverWithRedeliveryPolicyTest.java
 Sun Apr  4 13:47:42 2010
@@ -26,7 +26,7 @@ import org.apache.camel.processor.aggreg
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
-public class HawtDBAggregateRecoverTest extends CamelTestSupport {
+public class HawtDBAggregateRecoverWithRedeliveryPolicyTest extends 
CamelTestSupport {
 
     private HawtDBAggregationRepository<String> repo;
     private static AtomicInteger counter = new AtomicInteger(0);
@@ -44,13 +44,16 @@ public class HawtDBAggregateRecoverTest 
 
     @Test
     public void testHawtDBAggregateRecover() throws Exception {
-        // should fail the first 2 times and then recover
-        getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+        getMockEndpoint("mock:aggregated").setResultWaitTime(20000);
+        getMockEndpoint("mock:result").setResultWaitTime(20000);
+        
+        // should fail the first 3 times and then recover
+        getMockEndpoint("mock:aggregated").expectedMessageCount(4);
         getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
         // should be marked as redelivered
         
getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
         // on the 2nd redelivery attempt we success
-        
getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+        
getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
@@ -69,14 +72,14 @@ public class HawtDBAggregateRecoverTest 
                 from("direct:start")
                     .aggregate(header("id"), new MyAggregationStrategy())
                         .completionSize(5).aggregationRepository(repo)
+                        // this is the output from the aggregator
                         .log("aggregated exchange id ${exchangeId} with 
${body}")
                         .to("mock:aggregated")
-                        .delay(2000)
-                        // simulate errors the first two times
+                        // simulate errors the first three times
                         .process(new Processor() {
                             public void process(Exchange exchange) throws 
Exception {
                                 int count = counter.incrementAndGet();
-                                if (count <= 2) {
+                                if (count <= 3) {
                                     throw new IllegalArgumentException("Damn");
                                 }
                             }

Added: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java?rev=930692&view=auto
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java
 (added)
+++ 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java
 Sun Apr  4 13:47:42 2010
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hawtdb;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.test.junit4.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest extends 
CamelSpringTestSupport {
+
+    private static AtomicInteger counter = new AtomicInteger(0);
+
+    @Override
+    protected AbstractXmlApplicationContext createApplicationContext() {
+        return new 
ClassPathXmlApplicationContext("org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml");
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/data");
+        super.setUp();
+    }
+
+    @Test
+    public void testHawtDBAggregateRecover() throws Exception {
+        getMockEndpoint("mock:aggregated").setResultWaitTime(20000);
+        getMockEndpoint("mock:result").setResultWaitTime(20000);
+
+        // should fail the first 3 times and then recover
+        getMockEndpoint("mock:aggregated").expectedMessageCount(4);
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+        // should be marked as redelivered
+        
getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        // on the 2nd redelivery attempt we success
+        
getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(3);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public static class MyFailProcessor implements Processor {
+
+        public void process(Exchange exchange) throws Exception {
+            int count = counter.incrementAndGet();
+            if (count <= 3) {
+                throw new IllegalArgumentException("Damn");
+            }
+        }
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

Propchange: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: 
camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml
 (from r930635, 
camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateTest.xml)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml?p2=camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml&p1=camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateTest.xml&r1=930635&r2=930692&rev=930692&view=diff
==============================================================================
--- 
camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateTest.xml
 (original)
+++ 
camel/trunk/components/camel-hawtdb/src/test/resources/org/apache/camel/component/hawtdb/HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest.xml
 Sun Apr  4 13:47:42 2010
@@ -30,10 +30,17 @@
         <property name="persistentFileName" value="target/data/hawtdb.dat"/>
         <!-- and use repo2 as the repository name -->
         <property name="repositoryName" value="repo2"/>
+        <!-- scan every second -->
+        <property name="recoveryInterval" value="1000"/>
+        <!-- enable recovery -->
+        <property name="useRecovery" value="true"/>
     </bean>
 
     <!-- aggregate the messages using this strategy -->
-    <bean id="myAggregatorStrategy" 
class="org.apache.camel.component.hawtdb.HawtDBSpringAggregateTest$MyAggregationStrategy"/>
+    <bean id="myAggregatorStrategy" 
class="org.apache.camel.component.hawtdb.HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest$MyAggregationStrategy"/>
+
+    <!-- and use this processor to simulate errors -->
+    <bean id="myFailProcessor" 
class="org.apache.camel.component.hawtdb.HawtDBSpringAggregateRecoverWithRedeliveryPolicyTest$MyFailProcessor"/>
 
     <!-- this is the camel routes -->
     <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring";>
@@ -44,8 +51,12 @@
             <aggregate strategyRef="myAggregatorStrategy" 
aggregationRepositoryRef="repo" completionSize="5">
                 <!-- correlate by header with the key id -->
                 
<correlationExpression><header>id</header></correlationExpression>
-                <!-- send aggregated messages to the mock endpoint -->
+                <!-- send aggregated messages out here -->
                 <to uri="mock:aggregated"/>
+                <!-- this processor will fail the first 2 attempts -->
+                <process ref="myFailProcessor"/>
+                <!-- at the 3rd attempt we should be able to send to this mock 
result endpoint -->
+                <to uri="mock:result"/>
             </aggregate>
         </route>
 


Reply via email to