Author: davsclaus Date: Fri Jun 4 15:16:25 2010 New Revision: 951440 URL: http://svn.apache.org/viewvc?rev=951440&view=rev Log: CAMEL-2537: Added option allowDuplicates on batch resequencer so you can resequence messages with duplicate correlated keys. Only in batch mode.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerWithDuplicateTest.java - copied, changed from r951396, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringBatchResequencerAllowDuplicatesTest.java - copied, changed from r951396, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAOPAfterTest.java camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/BatchResequencerAllowDuplicatesTest.xml - copied, changed from r951396, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aopafter.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/Ordered.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=951440&r1=951439&r2=951440&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java Fri Jun 4 15:16:25 2010 @@ -168,6 +168,18 @@ public class ResequenceDefinition extend } /** + * Enables duplicates for the batch resequencer mode + * @return the builder + */ + public ResequenceDefinition allowDuplicates() { + if (batchConfig == null) { + throw new IllegalStateException("allowDuplicates() only supported for batch resequencer"); + } + batchConfig.setAllowDuplicates(true); + return this; + } + + /** * Sets the comparator to use for stream resequencer * * @param comparator the comparator @@ -252,7 +264,7 @@ public class ResequenceDefinition extend BatchResequencerConfig config) throws Exception { Processor processor = this.createChildProcessor(routeContext, true); - Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, resolveExpressionList(routeContext)); + Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, resolveExpressionList(routeContext), config.getAllowDuplicates()); resequencer.setBatchSize(config.getBatchSize()); resequencer.setBatchTimeout(config.getBatchTimeout()); return resequencer; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java?rev=951440&r1=951439&r2=951440&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java Fri Jun 4 15:16:25 2010 @@ -19,6 +19,7 @@ package org.apache.camel.model.config; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlEnum; import javax.xml.bind.annotation.XmlRootElement; /** @@ -61,6 +62,9 @@ public class BatchResequencerConfig { @XmlAttribute private Long batchTimeout; // optional XML attribute requires wrapper object + @XmlAttribute + private Boolean allowDuplicates = Boolean.FALSE; + /** * Creates a new {...@link BatchResequencerConfig} instance using default * values for <code>batchSize</code> (100) and <code>batchTimeout</code> @@ -110,5 +114,12 @@ public class BatchResequencerConfig { public void setBatchTimeout(long batchTimeout) { this.batchTimeout = batchTimeout; } - + + public Boolean getAllowDuplicates() { + return allowDuplicates; + } + + public void setAllowDuplicates(Boolean allowDuplicates) { + this.allowDuplicates = allowDuplicates; + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=951440&r1=951439&r2=951440&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java Fri Jun 4 15:16:25 2010 @@ -39,11 +39,11 @@ public class Resequencer extends BatchPr // TODO: Rework to avoid using BatchProcessor public Resequencer(CamelContext camelContext, Processor processor, Expression expression) { - this(camelContext, processor, createSet(expression)); + this(camelContext, processor, createSet(expression, false)); } - public Resequencer(CamelContext camelContext, Processor processor, List<Expression> expressions) { - this(camelContext, processor, createSet(expressions)); + public Resequencer(CamelContext camelContext, Processor processor, List<Expression> expressions, boolean allowDuplicates) { + this(camelContext, processor, createSet(expressions, allowDuplicates)); } public Resequencer(CamelContext camelContext, Processor processor, Set<Exchange> collection) { @@ -62,18 +62,35 @@ public class Resequencer extends BatchPr // Implementation methods //------------------------------------------------------------------------- - protected static Set<Exchange> createSet(Expression expression) { - return createSet(new ExpressionComparator(expression)); + protected static Set<Exchange> createSet(Expression expression, boolean allowDuplicates) { + return createSet(new ExpressionComparator(expression), allowDuplicates); } - protected static Set<Exchange> createSet(List<Expression> expressions) { + protected static Set<Exchange> createSet(List<Expression> expressions, boolean allowDuplicates) { if (expressions.size() == 1) { - return createSet(expressions.get(0)); + return createSet(expressions.get(0), allowDuplicates); } - return createSet(new ExpressionListComparator(expressions)); + return createSet(new ExpressionListComparator(expressions), allowDuplicates); } - protected static Set<Exchange> createSet(Comparator<? super Exchange> comparator) { - return new TreeSet<Exchange>(comparator); + protected static Set<Exchange> createSet(final Comparator<? super Exchange> comparator, boolean allowDuplicates) { + Comparator<? super Exchange> comp = comparator; + + // if we allow duplicates then we need to cater for that in the comparator + if (allowDuplicates) { + comp = new Comparator<Exchange>() { + public int compare(Exchange o1, Exchange o2) { + int answer = comparator.compare(o1, o2); + if (answer == 0) { + // they are equal but we should allow duplicates so say that o2 is higher + // so it will come next + return 1; + } + return answer; + } + }; + } + return new TreeSet<Exchange>(comp); } + } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/Ordered.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/Ordered.java?rev=951440&r1=951439&r2=951440&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/Ordered.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/Ordered.java Fri Jun 4 15:16:25 2010 @@ -38,7 +38,7 @@ public interface Ordered { * Gets the order. * <p/> * Use low numbers for higher priority. Normally the sorting will start from 0 and move upwards. - * So if you want to be last then use {...@link Integer#MAX_VALUE}. + * So if you want to be last then use {...@link Integer#MAX_VALUE} or eg {...@link #LOWEST}. * * @return the order */ Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java?rev=951440&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java Fri Jun 4 15:16:25 2010 @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version $Revision$ + */ +public class BatchResequencerAllowDuplicatesTest extends ContextTestSupport { + + public void testBatchResequencerAllowDuplicate() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("1A", "1B", "2C", "2D", "2E", "2F", "3G", "4H"); + + template.sendBodyAndHeader("direct:start", "1A", "id", "1"); + template.sendBodyAndHeader("direct:start", "2C", "id", "2"); + template.sendBodyAndHeader("direct:start", "2D", "id", "2"); + template.sendBodyAndHeader("direct:start", "4H", "id", "4"); + template.sendBodyAndHeader("direct:start", "1B", "id", "1"); + template.sendBodyAndHeader("direct:start", "2E", "id", "2"); + template.sendBodyAndHeader("direct:start", "3G", "id", "3"); + template.sendBodyAndHeader("direct:start", "2F", "id", "2"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // START SNIPPET: e1 + from("direct:start") + // allow duplicates which means messages with same id is retained + .resequence(header("id")).allowDuplicates() + .to("mock:result"); + // END SNIPPET: e1 + } + }; + } + +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerAllowDuplicatesTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerWithDuplicateTest.java (from r951396, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerWithDuplicateTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerWithDuplicateTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java&r1=951396&r2=951440&rev=951440&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BatchResequencerWithDuplicateTest.java Fri Jun 4 15:16:25 2010 @@ -16,70 +16,64 @@ */ package org.apache.camel.processor; -import java.util.List; - import org.apache.camel.ContextTestSupport; -import org.apache.camel.Endpoint; -import org.apache.camel.Route; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.EventDrivenConsumerRoute; -import org.apache.camel.management.JmxSystemPropertyKeys; -import org.apache.camel.processor.interceptor.StreamCaching; /** * @version $Revision$ */ -public class ResequencerTest extends ContextTestSupport { - protected Endpoint startEndpoint; - protected MockEndpoint resultEndpoint; - - public void testSendMessagesInWrongOrderButReceiveThemInCorrectOrder() throws Exception { - resultEndpoint.expectedBodiesReceived("Guillaume", "Hiram", "James", "Rob"); - sendBodies("direct:start", "Rob", "Hiram", "Guillaume", "James"); - resultEndpoint.assertIsSatisfied(); - } +public class BatchResequencerWithDuplicateTest extends ContextTestSupport { @Override - protected void setUp() throws Exception { - super.setUp(); - resultEndpoint = getMockEndpoint("mock:result"); + public boolean isUseRouteBuilder() { + return false; } - @Override - protected void tearDown() throws Exception { - super.tearDown(); - System.clearProperty(JmxSystemPropertyKeys.DISABLED); - } - - protected RouteBuilder createRouteBuilder() { - return new RouteBuilder() { - public void configure() { - // START SNIPPET: example - from("direct:start").resequence(body()).to("mock:result"); - // END SNIPPET: example + public void testBatchResequencerAllowDuplicate() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").resequence(header("id")).allowDuplicates().to("mock:result"); } - }; - } + }); + context.start(); - public void testBatchResequencerTypeWithJmx() throws Exception { - System.setProperty(JmxSystemPropertyKeys.DISABLED, "true"); - testBatchResequencerTypeWithoutJmx(); - } - - public void testBatchResequencerTypeWithoutJmx() throws Exception { - List<Route> list = getRouteList(createRouteBuilder()); - assertEquals("Number of routes created: " + list, 1, list.size()); + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("1A", "1B", "2C", "2D", "2E", "2F", "3G", "4H"); - Route route = list.get(0); - EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route); + template.sendBodyAndHeader("direct:start", "1A", "id", "1"); + template.sendBodyAndHeader("direct:start", "2C", "id", "2"); + template.sendBodyAndHeader("direct:start", "2D", "id", "2"); + template.sendBodyAndHeader("direct:start", "4H", "id", "4"); + template.sendBodyAndHeader("direct:start", "1B", "id", "1"); + template.sendBodyAndHeader("direct:start", "2E", "id", "2"); + template.sendBodyAndHeader("direct:start", "3G", "id", "3"); + template.sendBodyAndHeader("direct:start", "2F", "id", "2"); + + assertMockEndpointsSatisfied(); + } + + public void testBatchResequencerNoDuplicate() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").resequence(header("id")).to("mock:result"); + } + }); + context.start(); - DefaultChannel channel = assertIsInstanceOf(DefaultChannel.class, unwrapChannel(consumerRoute.getProcessor())); + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("A", "C", "E", "F"); - assertIsInstanceOf(DefaultErrorHandler.class, channel.getErrorHandler()); - assertFalse("Should not have stream caching", channel.hasInterceptorStrategy(StreamCaching.class)); + template.sendBodyAndHeader("direct:start", "A", "id", "1"); + template.sendBodyAndHeader("direct:start", "C", "id", "2"); + template.sendBodyAndHeader("direct:start", "D", "id", "2"); + template.sendBodyAndHeader("direct:start", "F", "id", "4"); + template.sendBodyAndHeader("direct:start", "B", "id", "1"); + template.sendBodyAndHeader("direct:start", "E", "id", "3"); - assertIsInstanceOf(Resequencer.class, channel.getNextProcessor()); + assertMockEndpointsSatisfied(); } -} +} \ No newline at end of file Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringBatchResequencerAllowDuplicatesTest.java (from r951396, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAOPAfterTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringBatchResequencerAllowDuplicatesTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringBatchResequencerAllowDuplicatesTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAOPAfterTest.java&r1=951396&r2=951440&rev=951440&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAOPAfterTest.java (original) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringBatchResequencerAllowDuplicatesTest.java Fri Jun 4 15:16:25 2010 @@ -17,15 +17,16 @@ package org.apache.camel.spring.processor; import org.apache.camel.CamelContext; -import org.apache.camel.processor.AOPAfterTest; +import org.apache.camel.processor.BatchResequencerAllowDuplicatesTest; + import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; /** * @version $Revision$ */ -public class SpringAOPAfterTest extends AOPAfterTest { +public class SpringBatchResequencerAllowDuplicatesTest extends BatchResequencerAllowDuplicatesTest { protected CamelContext createCamelContext() throws Exception { - return createSpringCamelContext(this, "org/apache/camel/spring/processor/aopafter.xml"); + return createSpringCamelContext(this, "org/apache/camel/spring/processor/BatchResequencerAllowDuplicatesTest.xml"); } } \ No newline at end of file Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/BatchResequencerAllowDuplicatesTest.xml (from r951396, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aopafter.xml) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/BatchResequencerAllowDuplicatesTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/BatchResequencerAllowDuplicatesTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aopafter.xml&r1=951396&r2=951440&rev=951440&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aopafter.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/BatchResequencerAllowDuplicatesTest.xml Fri Jun 4 15:16:25 2010 @@ -26,10 +26,11 @@ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="direct:start"/> - <aop afterUri="mock:after"> - <transform><constant>Bye World</constant></transform> + <resequence> + <header>id</header> <to uri="mock:result"/> - </aop> + <batch-config allowDuplicates="true"/> + </resequence> </route> </camelContext> <!-- END SNIPPET: e1 -->