Author: boday Date: Tue Jul 17 17:47:19 2012 New Revision: 1362585 URL: http://svn.apache.org/viewvc?rev=1362585&view=rev Log: CAMEL-4327 fixed randomly failing resequencer unit tests by using capacity instead of timeout, tweaked the rejectOld validation logic to short-circuit earlier
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1362585&r1=1362584&r2=1362585&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java Tue Jul 17 17:47:19 2012 @@ -206,6 +206,12 @@ public class ResequencerEngine<E> { throw new IllegalArgumentException("Element cannot be used in comparator: " + sequence.comparator()); } + // validate the exchange shouldn't be 'rejected' (if applicable) + if (rejectOld != null && rejectOld.booleanValue() && beforeLastDelivered(element)) { + throw new MessageRejectedException("rejecting message [" + element.getObject() + + "], it should have been sent before the last delivered message [" + lastDelivered.getObject() + "]"); + } + // add element to sequence in proper order sequence.add(element); @@ -222,10 +228,6 @@ public class ResequencerEngine<E> { // nothing to schedule } else if (sequence.predecessor(element) != null) { // nothing to schedule - } else if (rejectOld != null && rejectOld.booleanValue() && beforeLastDelivered(element)) { - sequence.remove(element); - throw new MessageRejectedException("rejecting message [" + element.getObject() - + "], it should have been sent before the last delivered message [" + lastDelivered.getObject() + "]"); } else { element.schedule(defineTimeout()); } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java?rev=1362585&r1=1362584&r2=1362585&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java Tue Jul 17 17:47:19 2012 @@ -25,54 +25,52 @@ import org.apache.camel.processor.resequ */ public class ResequenceStreamRejectOldExchangesTest extends ContextTestSupport { - public void testInSequenceAfterTimeout() throws Exception { + public void testInSequenceAfterCapacityReached() throws Exception { getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C", "E"); getMockEndpoint("mock:error").expectedMessageCount(0); template.sendBodyAndHeader("direct:start", "B", "seqno", 2); template.sendBodyAndHeader("direct:start", "C", "seqno", 3); template.sendBodyAndHeader("direct:start", "A", "seqno", 1); - Thread.sleep(1100); template.sendBodyAndHeader("direct:start", "E", "seqno", 5); assertMockEndpointsSatisfied(); } - public void testDuplicateAfterTimeout() throws Exception { + public void testDuplicateAfterCapacityReached() throws Exception { getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C"); getMockEndpoint("mock:error").expectedMessageCount(0); template.sendBodyAndHeader("direct:start", "B", "seqno", 2); template.sendBodyAndHeader("direct:start", "C", "seqno", 3); template.sendBodyAndHeader("direct:start", "A", "seqno", 1); - Thread.sleep(1100); template.sendBodyAndHeader("direct:start", "C", "seqno", 3); assertMockEndpointsSatisfied(); } - public void testOutOfSequenceAfterTimeoutSimple() throws Exception { + public void testOutOfSequenceAfterCapacityReachedSimple() throws Exception { getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D"); getMockEndpoint("mock:error").expectedBodiesReceived("A"); template.sendBodyAndHeader("direct:start", "D", "seqno", 4); template.sendBodyAndHeader("direct:start", "C", "seqno", 3); template.sendBodyAndHeader("direct:start", "B", "seqno", 2); - Thread.sleep(1100); template.sendBodyAndHeader("direct:start", "A", "seqno", 1); assertMockEndpointsSatisfied(); } - public void testOutOfSequenceAfterTimeoutComplex() throws Exception { + + public void testOutOfSequenceAfterCapacityReachedComplex() throws Exception { getMockEndpoint("mock:result").expectedBodiesReceived("A", "D", "E", "F"); getMockEndpoint("mock:error").expectedBodiesReceived("B", "C"); + template.sendBodyAndHeader("direct:start", "E", "seqno", 5); template.sendBodyAndHeader("direct:start", "D", "seqno", 4); template.sendBodyAndHeader("direct:start", "A", "seqno", 1); - Thread.sleep(1100); + template.sendBodyAndHeader("direct:start", "B", "seqno", 2); - template.sendBodyAndHeader("direct:start", "E", "seqno", 5); template.sendBodyAndHeader("direct:start", "C", "seqno", 3); template.sendBodyAndHeader("direct:start", "F", "seqno", 6); @@ -86,9 +84,9 @@ public class ResequenceStreamRejectOldEx public void configure() throws Exception { from("direct:start") - .onException(MessageRejectedException.class).handled(true).to("mock:error").end() - .resequence(header("seqno")).stream().timeout(1000).rejectOld() - .to("mock:result"); + .onException(MessageRejectedException.class).maximumRedeliveries(0).handled(true).to("mock:error").end() + .resequence(header("seqno")).stream().capacity(3).rejectOld() + .to("mock:result"); } }; } Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml?rev=1362585&r1=1362584&r2=1362585&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml Tue Jul 17 17:47:19 2012 @@ -30,7 +30,7 @@ <to uri="mock:error"/> </onException> <resequence> - <stream-config capacity="100" timeout="1000"> + <stream-config capacity="3" timeout="1000"> <rejectOld>true</rejectOld> </stream-config> <header>seqno</header>