Author: davsclaus Date: Fri Jun 4 15:37:52 2010 New Revision: 951449 URL: http://svn.apache.org/viewvc?rev=951449&view=rev Log: CAMEL-2537: Added option reverse on batch resequencer so you can resequence JMSPriority messages and have 9..0 ordering. Only in batch mode.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerJMSPriorityTest.java - copied, changed from r951440, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=951449&r1=951448&r2=951449&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java Fri Jun 4 15:37:52 2010 @@ -180,6 +180,22 @@ public class ResequenceDefinition extend } /** + * Enables reverse mode for the batch resequencer mode. + * <p/> + * This means the expression for determine the sequence order will be reversed. + * Can be used for Z..A or 9..0 ordering. + * + * @return the builder + */ + public ResequenceDefinition reverse() { + if (batchConfig == null) { + throw new IllegalStateException("reverse() only supported for batch resequencer"); + } + batchConfig.setReverse(true); + return this; + } + + /** * Sets the comparator to use for stream resequencer * * @param comparator the comparator @@ -264,7 +280,8 @@ public class ResequenceDefinition extend BatchResequencerConfig config) throws Exception { Processor processor = this.createChildProcessor(routeContext, true); - Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, resolveExpressionList(routeContext), config.getAllowDuplicates()); + Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, resolveExpressionList(routeContext), + config.getAllowDuplicates(), config.getReverse()); resequencer.setBatchSize(config.getBatchSize()); resequencer.setBatchTimeout(config.getBatchTimeout()); return resequencer; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java?rev=951449&r1=951448&r2=951449&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java Fri Jun 4 15:37:52 2010 @@ -65,6 +65,9 @@ public class BatchResequencerConfig { @XmlAttribute private Boolean allowDuplicates = Boolean.FALSE; + @XmlAttribute + private Boolean reverse = Boolean.FALSE; + /** * Creates a new {...@link BatchResequencerConfig} instance using default * values for <code>batchSize</code> (100) and <code>batchTimeout</code> @@ -122,4 +125,12 @@ public class BatchResequencerConfig { public void setAllowDuplicates(Boolean allowDuplicates) { this.allowDuplicates = allowDuplicates; } + + public Boolean getReverse() { + return reverse; + } + + public void setReverse(Boolean reverse) { + this.reverse = reverse; + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=951449&r1=951448&r2=951449&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java Fri Jun 4 15:37:52 2010 @@ -39,11 +39,12 @@ public class Resequencer extends BatchPr // TODO: Rework to avoid using BatchProcessor public Resequencer(CamelContext camelContext, Processor processor, Expression expression) { - this(camelContext, processor, createSet(expression, false)); + this(camelContext, processor, createSet(expression, false, false)); } - public Resequencer(CamelContext camelContext, Processor processor, List<Expression> expressions, boolean allowDuplicates) { - this(camelContext, processor, createSet(expressions, allowDuplicates)); + public Resequencer(CamelContext camelContext, Processor processor, List<Expression> expressions, + boolean allowDuplicates, boolean reverse) { + this(camelContext, processor, createSet(expressions, allowDuplicates, reverse)); } public Resequencer(CamelContext camelContext, Processor processor, Set<Exchange> collection) { @@ -62,25 +63,36 @@ public class Resequencer extends BatchPr // Implementation methods //------------------------------------------------------------------------- - protected static Set<Exchange> createSet(Expression expression, boolean allowDuplicates) { - return createSet(new ExpressionComparator(expression), allowDuplicates); + protected static Set<Exchange> createSet(Expression expression, boolean allowDuplicates, boolean reverse) { + return createSet(new ExpressionComparator(expression), allowDuplicates, reverse); } - protected static Set<Exchange> createSet(List<Expression> expressions, boolean allowDuplicates) { + protected static Set<Exchange> createSet(List<Expression> expressions, boolean allowDuplicates, boolean reverse) { if (expressions.size() == 1) { - return createSet(expressions.get(0), allowDuplicates); + return createSet(expressions.get(0), allowDuplicates, reverse); } - return createSet(new ExpressionListComparator(expressions), allowDuplicates); + return createSet(new ExpressionListComparator(expressions), allowDuplicates, reverse); } - protected static Set<Exchange> createSet(final Comparator<? super Exchange> comparator, boolean allowDuplicates) { - Comparator<? super Exchange> comp = comparator; + protected static Set<Exchange> createSet(final Comparator<? super Exchange> comparator, boolean allowDuplicates, boolean reverse) { + Comparator<? super Exchange> answer = comparator; + + if (reverse) { + answer = new Comparator<Exchange>() { + public int compare(Exchange o1, Exchange o2) { + int answer = comparator.compare(o1, o2); + // reverse it + return answer * -1; + } + }; + } // if we allow duplicates then we need to cater for that in the comparator + final Comparator<? super Exchange> forAllowDuplicates = answer; if (allowDuplicates) { - comp = new Comparator<Exchange>() { + answer = new Comparator<Exchange>() { public int compare(Exchange o1, Exchange o2) { - int answer = comparator.compare(o1, o2); + int answer = forAllowDuplicates.compare(o1, o2); if (answer == 0) { // they are equal but we should allow duplicates so say that o2 is higher // so it will come next @@ -90,7 +102,8 @@ public class Resequencer extends BatchPr } }; } - return new TreeSet<Exchange>(comp); + + return new TreeSet<Exchange>(answer); } } Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerJMSPriorityTest.java (from r951440, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerJMSPriorityTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerJMSPriorityTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java&r1=951440&r2=951449&rev=951449&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerJMSPriorityTest.java Fri Jun 4 15:37:52 2010 @@ -23,20 +23,20 @@ import org.apache.camel.component.mock.M /** * @version $Revision$ */ -public class BatchResequencerAllowDuplicatesTest extends ContextTestSupport { +public class BatchResequencerJMSPriorityTest extends ContextTestSupport { - public void testBatchResequencerAllowDuplicate() throws Exception { + public void testBatchResequencerJMSPriority() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); - mock.expectedBodiesReceived("1A", "1B", "2C", "2D", "2E", "2F", "3G", "4H"); + mock.expectedBodiesReceived("G", "A", "B", "E", "H", "C", "D", "F"); - template.sendBodyAndHeader("direct:start", "1A", "id", "1"); - template.sendBodyAndHeader("direct:start", "2C", "id", "2"); - template.sendBodyAndHeader("direct:start", "2D", "id", "2"); - template.sendBodyAndHeader("direct:start", "4H", "id", "4"); - template.sendBodyAndHeader("direct:start", "1B", "id", "1"); - template.sendBodyAndHeader("direct:start", "2E", "id", "2"); - template.sendBodyAndHeader("direct:start", "3G", "id", "3"); - template.sendBodyAndHeader("direct:start", "2F", "id", "2"); + template.sendBodyAndHeader("direct:start", "A", "JMSPriority", 6); + template.sendBodyAndHeader("direct:start", "B", "JMSPriority", 6); + template.sendBodyAndHeader("direct:start", "C", "JMSPriority", 4); + template.sendBodyAndHeader("direct:start", "D", "JMSPriority", 4); + template.sendBodyAndHeader("direct:start", "E", "JMSPriority", 6); + template.sendBodyAndHeader("direct:start", "F", "JMSPriority", 4); + template.sendBodyAndHeader("direct:start", "G", "JMSPriority", 8); + template.sendBodyAndHeader("direct:start", "H", "JMSPriority", 6); assertMockEndpointsSatisfied(); } @@ -48,8 +48,9 @@ public class BatchResequencerAllowDuplic public void configure() throws Exception { // START SNIPPET: e1 from("direct:start") - // allow duplicates which means messages with same id is retained - .resequence(header("id")).allowDuplicates() + // sort by JMSPriority by allowing duplicates (message can have same JMSPriority) + // and use reverse ordering so 9 is first output (most important), and 0 is last + .resequence(header("JMSPriority")).allowDuplicates().reverse() .to("mock:result"); // END SNIPPET: e1 }