Author: davsclaus
Date: Fri Jun  4 15:37:52 2010
New Revision: 951449

URL: http://svn.apache.org/viewvc?rev=951449&view=rev
Log:
CAMEL-2537: Added option reverse on batch resequencer so you can resequence 
JMSPriority messages and have 9..0 ordering. Only in batch mode.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerJMSPriorityTest.java
      - copied, changed from r951440, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=951449&r1=951448&r2=951449&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java
 Fri Jun  4 15:37:52 2010
@@ -180,6 +180,22 @@ public class ResequenceDefinition extend
     }
 
     /**
+     * Enables reverse mode for the batch resequencer mode.
+     * <p/>
+     * This means the expression for determine the sequence order will be 
reversed.
+     * Can be used for Z..A or 9..0 ordering.
+     *
+     * @return the builder
+     */
+    public ResequenceDefinition reverse() {
+        if (batchConfig == null) {
+            throw new IllegalStateException("reverse() only supported for 
batch resequencer");
+        }
+        batchConfig.setReverse(true);
+        return this;
+    }
+
+    /**
      * Sets the comparator to use for stream resequencer
      *
      * @param comparator  the comparator
@@ -264,7 +280,8 @@ public class ResequenceDefinition extend
             BatchResequencerConfig config) throws Exception {
 
         Processor processor = this.createChildProcessor(routeContext, true);
-        Resequencer resequencer = new 
Resequencer(routeContext.getCamelContext(), processor, 
resolveExpressionList(routeContext), config.getAllowDuplicates());
+        Resequencer resequencer = new 
Resequencer(routeContext.getCamelContext(), processor, 
resolveExpressionList(routeContext),
+                config.getAllowDuplicates(), config.getReverse());
         resequencer.setBatchSize(config.getBatchSize());
         resequencer.setBatchTimeout(config.getBatchTimeout());
         return resequencer;

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java?rev=951449&r1=951448&r2=951449&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java
 Fri Jun  4 15:37:52 2010
@@ -65,6 +65,9 @@ public class BatchResequencerConfig {
     @XmlAttribute
     private Boolean allowDuplicates = Boolean.FALSE;
 
+    @XmlAttribute
+    private Boolean reverse = Boolean.FALSE;
+
     /**
      * Creates a new {...@link BatchResequencerConfig} instance using default
      * values for <code>batchSize</code> (100) and <code>batchTimeout</code>
@@ -122,4 +125,12 @@ public class BatchResequencerConfig {
     public void setAllowDuplicates(Boolean allowDuplicates) {
         this.allowDuplicates = allowDuplicates;
     }
+
+    public Boolean getReverse() {
+        return reverse;
+    }
+
+    public void setReverse(Boolean reverse) {
+        this.reverse = reverse;
+    }
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=951449&r1=951448&r2=951449&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
 Fri Jun  4 15:37:52 2010
@@ -39,11 +39,12 @@ public class Resequencer extends BatchPr
     // TODO: Rework to avoid using BatchProcessor
 
     public Resequencer(CamelContext camelContext, Processor processor, 
Expression expression) {
-        this(camelContext, processor, createSet(expression, false));
+        this(camelContext, processor, createSet(expression, false, false));
     }
 
-    public Resequencer(CamelContext camelContext, Processor processor, 
List<Expression> expressions, boolean allowDuplicates) {
-        this(camelContext, processor, createSet(expressions, allowDuplicates));
+    public Resequencer(CamelContext camelContext, Processor processor, 
List<Expression> expressions,
+                       boolean allowDuplicates, boolean reverse) {
+        this(camelContext, processor, createSet(expressions, allowDuplicates, 
reverse));
     }
 
     public Resequencer(CamelContext camelContext, Processor processor, 
Set<Exchange> collection) {
@@ -62,25 +63,36 @@ public class Resequencer extends BatchPr
     // Implementation methods
     //-------------------------------------------------------------------------
 
-    protected static Set<Exchange> createSet(Expression expression, boolean 
allowDuplicates) {
-        return createSet(new ExpressionComparator(expression), 
allowDuplicates);
+    protected static Set<Exchange> createSet(Expression expression, boolean 
allowDuplicates, boolean reverse) {
+        return createSet(new ExpressionComparator(expression), 
allowDuplicates, reverse);
     }
 
-    protected static Set<Exchange> createSet(List<Expression> expressions, 
boolean allowDuplicates) {
+    protected static Set<Exchange> createSet(List<Expression> expressions, 
boolean allowDuplicates, boolean reverse) {
         if (expressions.size() == 1) {
-            return createSet(expressions.get(0), allowDuplicates);
+            return createSet(expressions.get(0), allowDuplicates, reverse);
         }
-        return createSet(new ExpressionListComparator(expressions), 
allowDuplicates);
+        return createSet(new ExpressionListComparator(expressions), 
allowDuplicates, reverse);
     }
 
-    protected static Set<Exchange> createSet(final Comparator<? super 
Exchange> comparator, boolean allowDuplicates) {
-        Comparator<? super Exchange> comp = comparator;
+    protected static Set<Exchange> createSet(final Comparator<? super 
Exchange> comparator, boolean allowDuplicates, boolean reverse) {
+        Comparator<? super Exchange> answer = comparator;
+
+        if (reverse) {
+            answer = new Comparator<Exchange>() {
+                public int compare(Exchange o1, Exchange o2) {
+                    int answer = comparator.compare(o1, o2);
+                    // reverse it
+                    return answer * -1;
+                }
+            };
+        }
 
         // if we allow duplicates then we need to cater for that in the 
comparator
+        final Comparator<? super Exchange> forAllowDuplicates = answer;
         if (allowDuplicates) {
-            comp = new Comparator<Exchange>() {
+            answer = new Comparator<Exchange>() {
                 public int compare(Exchange o1, Exchange o2) {
-                    int answer = comparator.compare(o1, o2);
+                    int answer = forAllowDuplicates.compare(o1, o2);
                     if (answer == 0) {
                         // they are equal but we should allow duplicates so 
say that o2 is higher
                         // so it will come next
@@ -90,7 +102,8 @@ public class Resequencer extends BatchPr
                 }
             };
         }
-        return new TreeSet<Exchange>(comp);
+
+        return new TreeSet<Exchange>(answer);
     }
 
 }

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerJMSPriorityTest.java
 (from r951440, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerJMSPriorityTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerJMSPriorityTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java&r1=951440&r2=951449&rev=951449&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerJMSPriorityTest.java
 Fri Jun  4 15:37:52 2010
@@ -23,20 +23,20 @@ import org.apache.camel.component.mock.M
 /**
  * @version $Revision$
  */
-public class BatchResequencerAllowDuplicatesTest extends ContextTestSupport {
+public class BatchResequencerJMSPriorityTest extends ContextTestSupport {
 
-    public void testBatchResequencerAllowDuplicate() throws Exception {
+    public void testBatchResequencerJMSPriority() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("1A", "1B", "2C", "2D", "2E", "2F", "3G", 
"4H");
+        mock.expectedBodiesReceived("G", "A", "B", "E", "H", "C", "D", "F");
 
-        template.sendBodyAndHeader("direct:start", "1A", "id", "1");
-        template.sendBodyAndHeader("direct:start", "2C", "id", "2");
-        template.sendBodyAndHeader("direct:start", "2D", "id", "2");
-        template.sendBodyAndHeader("direct:start", "4H", "id", "4");
-        template.sendBodyAndHeader("direct:start", "1B", "id", "1");
-        template.sendBodyAndHeader("direct:start", "2E", "id", "2");
-        template.sendBodyAndHeader("direct:start", "3G", "id", "3");
-        template.sendBodyAndHeader("direct:start", "2F", "id", "2");
+        template.sendBodyAndHeader("direct:start", "A", "JMSPriority", 6);
+        template.sendBodyAndHeader("direct:start", "B", "JMSPriority", 6);
+        template.sendBodyAndHeader("direct:start", "C", "JMSPriority", 4);
+        template.sendBodyAndHeader("direct:start", "D", "JMSPriority", 4);
+        template.sendBodyAndHeader("direct:start", "E", "JMSPriority", 6);
+        template.sendBodyAndHeader("direct:start", "F", "JMSPriority", 4);
+        template.sendBodyAndHeader("direct:start", "G", "JMSPriority", 8);
+        template.sendBodyAndHeader("direct:start", "H", "JMSPriority", 6);
 
         assertMockEndpointsSatisfied();
     }
@@ -48,8 +48,9 @@ public class BatchResequencerAllowDuplic
             public void configure() throws Exception {
                 // START SNIPPET: e1
                 from("direct:start")
-                    // allow duplicates which means messages with same id is 
retained
-                    .resequence(header("id")).allowDuplicates()
+                    // sort by JMSPriority by allowing duplicates (message can 
have same JMSPriority)
+                    // and use reverse ordering so 9 is first output (most 
important), and 0 is last
+                    
.resequence(header("JMSPriority")).allowDuplicates().reverse()
                     .to("mock:result");
                 // END SNIPPET: e1
             }


Reply via email to