Author: boday
Date: Tue Jul 17 17:47:19 2012
New Revision: 1362585

URL: http://svn.apache.org/viewvc?rev=1362585&view=rev
Log:
CAMEL-4327 fixed randomly failing resequencer unit tests by using capacity 
instead of timeout, tweaked the rejectOld validation logic to short-circuit 
earlier

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
    
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1362585&r1=1362584&r2=1362585&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
 Tue Jul 17 17:47:19 2012
@@ -206,6 +206,12 @@ public class ResequencerEngine<E> {
             throw new IllegalArgumentException("Element cannot be used in 
comparator: " + sequence.comparator());
         }
 
+        // validate the exchange shouldn't be 'rejected' (if applicable)
+        if (rejectOld != null && rejectOld.booleanValue() && 
beforeLastDelivered(element)) {
+            throw new MessageRejectedException("rejecting message [" + 
element.getObject()
+                    + "], it should have been sent before the last delivered 
message [" + lastDelivered.getObject() + "]");
+        }
+
         // add element to sequence in proper order
         sequence.add(element);
 
@@ -222,10 +228,6 @@ public class ResequencerEngine<E> {
             // nothing to schedule
         } else if (sequence.predecessor(element) != null) {
             // nothing to schedule
-        } else if (rejectOld != null && rejectOld.booleanValue() && 
beforeLastDelivered(element)) {
-            sequence.remove(element);
-            throw new MessageRejectedException("rejecting message [" + 
element.getObject()
-                    + "], it should have been sent before the last delivered 
message [" + lastDelivered.getObject() + "]");
         } else {
             element.schedule(defineTimeout());
         }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java?rev=1362585&r1=1362584&r2=1362585&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
 Tue Jul 17 17:47:19 2012
@@ -25,54 +25,52 @@ import org.apache.camel.processor.resequ
  */
 public class ResequenceStreamRejectOldExchangesTest extends ContextTestSupport 
{
 
-    public void testInSequenceAfterTimeout() throws Exception {
+    public void testInSequenceAfterCapacityReached() throws Exception {
         getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C", 
"E");
         getMockEndpoint("mock:error").expectedMessageCount(0);
 
         template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
         template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
         template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
-        Thread.sleep(1100);
         template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
 
         assertMockEndpointsSatisfied();
     }
 
-    public void testDuplicateAfterTimeout() throws Exception {
+    public void testDuplicateAfterCapacityReached() throws Exception {
         getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C");
         getMockEndpoint("mock:error").expectedMessageCount(0);
 
         template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
         template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
         template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
-        Thread.sleep(1100);
         template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
 
         assertMockEndpointsSatisfied();
     }
 
-    public void testOutOfSequenceAfterTimeoutSimple() throws Exception {
+    public void testOutOfSequenceAfterCapacityReachedSimple() throws Exception 
{
         getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
         getMockEndpoint("mock:error").expectedBodiesReceived("A");
 
         template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
         template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
         template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
-        Thread.sleep(1100);
         template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
 
         assertMockEndpointsSatisfied();
     }
 
-    public void testOutOfSequenceAfterTimeoutComplex() throws Exception {
+    
+    public void testOutOfSequenceAfterCapacityReachedComplex() throws 
Exception {
         getMockEndpoint("mock:result").expectedBodiesReceived("A", "D", "E", 
"F");
         getMockEndpoint("mock:error").expectedBodiesReceived("B", "C");
 
+        template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
         template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
         template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
-        Thread.sleep(1100);
+
         template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
-        template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
         template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
         template.sendBodyAndHeader("direct:start", "F", "seqno", 6);
 
@@ -86,9 +84,9 @@ public class ResequenceStreamRejectOldEx
             public void configure() throws Exception {
 
                 from("direct:start")
-                        
.onException(MessageRejectedException.class).handled(true).to("mock:error").end()
-                        
.resequence(header("seqno")).stream().timeout(1000).rejectOld()
-                        .to("mock:result");
+                    
.onException(MessageRejectedException.class).maximumRedeliveries(0).handled(true).to("mock:error").end()
+                    
.resequence(header("seqno")).stream().capacity(3).rejectOld()
+                    .to("mock:result");
             }
         };
     }

Modified: 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml?rev=1362585&r1=1362584&r2=1362585&view=diff
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
 (original)
+++ 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
 Tue Jul 17 17:47:19 2012
@@ -30,7 +30,7 @@
                 <to uri="mock:error"/>
             </onException>
             <resequence>
-                <stream-config capacity="100" timeout="1000">
+                <stream-config capacity="3" timeout="1000">
                     <rejectOld>true</rejectOld>
                 </stream-config>
                 <header>seqno</header>


Reply via email to