Author: boday
Date: Mon Jul 16 17:51:17 2012
New Revision: 1362163

URL: http://svn.apache.org/viewvc?rev=1362163&view=rev
Log:
CAMEL-4327 added "rejectOld" option to the Resequencer EIP to throw an error if 
older messages are received after the last delivered message

Added:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
    
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
    
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=1362163&r1=1362162&r2=1362163&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
 Mon Jul 16 17:51:17 2012
@@ -150,6 +150,18 @@ public class ResequenceDefinition extend
     }
 
     /**
+     * Sets the rejectOld flag to throw an error when a message older than the 
last delivered message is processed
+     * @return the builder
+     */
+    public ResequenceDefinition rejectOld() {
+        if (streamConfig == null) {
+            throw new IllegalStateException("rejectOld() only supported for 
stream resequencer");
+        }
+        streamConfig.setRejectOld(true);
+        return this;
+    }
+
+    /**
      * Sets the in batch size for number of exchanges received
      * @param batchSize  the batch size
      * @return the builder
@@ -368,6 +380,7 @@ public class ResequenceDefinition extend
         StreamResequencer resequencer = new 
StreamResequencer(routeContext.getCamelContext(), processor, comparator);
         resequencer.setTimeout(config.getTimeout());
         resequencer.setCapacity(config.getCapacity());
+        resequencer.setRejectOld(config.getRejectOld());
         if (config.getIgnoreInvalidExchanges() != null) {
             
resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
         }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=1362163&r1=1362162&r2=1362163&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java
 Mon Jul 16 17:51:17 2012
@@ -19,6 +19,7 @@ package org.apache.camel.model.config;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
@@ -41,6 +42,8 @@ public class StreamResequencerConfig ext
     private Boolean ignoreInvalidExchanges;
     @XmlTransient
     private ExpressionResultComparator comparator;
+    @XmlElement
+    private Boolean rejectOld;
 
     /**
      * Creates a new {@link StreamResequencerConfig} instance using default
@@ -123,5 +126,13 @@ public class StreamResequencerConfig ext
     public void setComparator(ExpressionResultComparator comparator) {
         this.comparator = comparator;
     }
-    
+
+    public void setRejectOld(boolean value) {
+        this.rejectOld = value;
+    }
+
+    public Boolean getRejectOld() {
+        return rejectOld;
+    }
+
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=1362163&r1=1362162&r2=1362163&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
 Mon Jul 16 17:51:17 2012
@@ -141,6 +141,10 @@ public class StreamResequencer extends S
         return ignoreInvalidExchanges;
     }
 
+    public void setRejectOld(Boolean rejectOld) {
+        engine.setRejectOld(rejectOld);
+    }
+
     /**
      * Sets whether to ignore invalid exchanges which cannot be used by this 
stream resequencer.
      * <p/>

Added: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java?rev=1362163&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
 (added)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/MessageRejectedException.java
 Mon Jul 16 17:51:17 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.resequencer;
+
+import org.apache.camel.RuntimeCamelException;
+
+/**
+ * An exception thrown if message is rejected by the resequencer
+ *
+ * @version
+ */
+public class MessageRejectedException extends RuntimeCamelException {
+    private static final long serialVersionUID = 5755929795399134568L;
+
+    public MessageRejectedException(String message) {
+        super(message);
+    }
+}
\ No newline at end of file

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java?rev=1362163&r1=1362162&r2=1362163&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ResequencerEngine.java
 Mon Jul 16 17:51:17 2012
@@ -87,6 +87,11 @@ public class ResequencerEngine<E> {
     private SequenceSender<E> sequenceSender;
 
     /**
+     * Indicates whether an error should be thrown if message older (based on 
Comparator) than the last delivered message is received.
+     */
+    private Boolean rejectOld;
+
+    /**
      * Creates a new resequencer instance with a default timeout of 2000
      * milliseconds.
      *
@@ -136,6 +141,14 @@ public class ResequencerEngine<E> {
         this.timeout = timeout;
     }
 
+    public Boolean getRejectOld() {
+        return rejectOld;
+    }
+
+    public void setRejectOld(Boolean rejectOld) {
+        this.rejectOld = rejectOld;
+    }
+
     /**
      * Returns the sequence sender.
      *
@@ -209,6 +222,9 @@ public class ResequencerEngine<E> {
             // nothing to schedule
         } else if (sequence.predecessor(element) != null) {
             // nothing to schedule
+        } else if (rejectOld != null && rejectOld.booleanValue() && 
beforeLastDelivered(element)) {
+            throw new MessageRejectedException("rejecting message [" + 
element.getObject()
+                    + "], it should have been sent before the last delivered 
message [" + lastDelivered.getObject() + "]");
         } else {
             element.schedule(defineTimeout());
         }
@@ -283,6 +299,22 @@ public class ResequencerEngine<E> {
     }
 
     /**
+     * Retuns <code>true</code> if the given element is before the last 
delivered element.
+     *
+     * @param element an element.
+     * @return <code>true</code> if the given element is before the last 
delivered element.
+     */
+    private boolean beforeLastDelivered(Element<E> element) {
+        if (lastDelivered == null) {
+            return false;
+        }
+        if (sequence.comparator().compare(element, lastDelivered) < 0) {
+            return true;
+        }
+        return false;
+    }
+
+    /**
      * Creates a timeout task based on the timeout setting of this resequencer.
      *
      * @return a new timeout task.

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
 Mon Jul 16 17:51:17 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.resequencer.MessageRejectedException;
+
+/**
+ *
+ */
+public class ResequenceStreamRejectOldExchangesTest extends ContextTestSupport 
{
+
+    public void testInSequenceAfterTimeout() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C", 
"E");
+        getMockEndpoint("mock:error").expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
+        Thread.sleep(1100);
+        template.sendBodyAndHeader("direct:start", "E", "seqno", 5);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testDuplicateAfterTimeout() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C");
+        getMockEndpoint("mock:error").expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
+        Thread.sleep(1100);
+        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testOutOfSequenceAfterTimeout() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("A", "C", "D");
+        getMockEndpoint("mock:error").expectedBodiesReceived("B");
+
+        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
+        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
+        Thread.sleep(1100);
+        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testOutOfSequenceAfterTimeout2() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("B", "C", "D");
+        getMockEndpoint("mock:error").expectedBodiesReceived("A");
+
+        template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
+        template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
+        template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
+        Thread.sleep(1100);
+        template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                from("direct:start")
+                        
.onException(MessageRejectedException.class).handled(true).to("mock:error").end()
+                        
.resequence(header("seqno")).stream().timeout(1000).rejectOld()
+                        .to("mock:result");
+            }
+        };
+    }
+}

Added: 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java?rev=1362163&view=auto
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
 (added)
+++ 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringResequenceStreamRejectOldExchangesTest.java
 Mon Jul 16 17:51:17 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.ResequenceStreamRejectOldExchangesTest;
+import org.apache.camel.processor.ResequencerTest;
+
+import static 
org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+/**
+ * @version
+ */
+public class SpringResequenceStreamRejectOldExchangesTest extends 
ResequenceStreamRejectOldExchangesTest {
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this, 
"org/apache/camel/spring/processor/resequencerRejectOld.xml");
+    }
+}
\ No newline at end of file

Added: 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml?rev=1362163&view=auto
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
 (added)
+++ 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/resequencerRejectOld.xml
 Mon Jul 16 17:51:17 2012
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd";>
+
+    <camelContext xmlns="http://camel.apache.org/schema/spring";>
+        <route>
+            <from uri="direct:start"/>
+            <onException>
+                
<exception>org.apache.camel.processor.resequencer.MessageRejectedException</exception>
+                <handled><constant>true</constant></handled>
+                <to uri="mock:error"/>
+            </onException>
+            <resequence>
+                <stream-config capacity="100" timeout="1000">
+                    <rejectOld>true</rejectOld>
+                </stream-config>
+                <header>seqno</header>
+                <to uri="mock:result"/>
+            </resequence>
+        </route>
+    </camelContext>
+
+</beans>


Reply via email to