Author: davsclaus Date: Tue Feb 22 09:57:22 2011 New Revision: 1073278 URL: http://svn.apache.org/viewvc?rev=1073278&view=rev Log: CAMEL-3696: Removed list of Expression for Resequencer EIP as it was not support and useable anyway. Only 1 expression is needed.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=1073278&r1=1073277&r2=1073278&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Tue Feb 22 09:57:22 2011 @@ -1621,8 +1621,7 @@ public abstract class ProcessorDefinitio ResequenceDefinition answer = new ResequenceDefinition(); addOutput(answer); ExpressionClause<ResequenceDefinition> clause = new ExpressionClause<ResequenceDefinition>(answer); - answer.expression(clause); - return clause; + return ExpressionClause.createAndSetExpression(answer); } /** @@ -1633,36 +1632,13 @@ public abstract class ProcessorDefinitio * @return the builder */ public ResequenceDefinition resequence(Expression expression) { - return resequence(Collections.<Expression>singletonList(expression)); - } - - /** - * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a> - * Creates a resequencer allowing you to reorganize messages based on some comparator. - * - * @param expressions the list of expressions on which to compare messages in order - * @return the builder - */ - public ResequenceDefinition resequence(List<Expression> expressions) { - ResequenceDefinition answer = new ResequenceDefinition(expressions); + ResequenceDefinition answer = new ResequenceDefinition(); + answer.setExpression(new ExpressionDefinition(expression)); addOutput(answer); return answer; } /** - * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a> - * Creates a splitter allowing you to reorganise messages based on some comparator. - * - * @param expressions the list of expressions on which to compare messages in order - * @return the builder - */ - public ResequenceDefinition resequence(Expression... expressions) { - List<Expression> list = new ArrayList<Expression>(); - list.addAll(Arrays.asList(expressions)); - return resequence(list); - } - - /** * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a> * Creates an aggregator allowing you to combine a number of messages together into a single message. * Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java?rev=1073278&r1=1073277&r2=1073278&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ResequenceDefinition.java Tue Feb 22 09:57:22 2011 @@ -16,23 +16,20 @@ */ package org.apache.camel.model; -import java.util.ArrayList; -import java.util.List; - +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; -import javax.xml.bind.annotation.XmlElementRef; import javax.xml.bind.annotation.XmlRootElement; -import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.model.config.BatchResequencerConfig; import org.apache.camel.model.config.StreamResequencerConfig; -import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.processor.Resequencer; import org.apache.camel.processor.StreamResequencer; import org.apache.camel.processor.resequencer.ExpressionResultComparator; import org.apache.camel.spi.RouteContext; +import org.apache.camel.util.ObjectHelper; /** * Represents an XML <resequence/> element @@ -40,32 +37,21 @@ import org.apache.camel.spi.RouteContext * @version */ @XmlRootElement(name = "resequence") -public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefinition> { - @XmlElementRef - private List<ExpressionDefinition> expressions = new ArrayList<ExpressionDefinition>(); - @XmlElementRef - private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>(); - // Binding annotation at setter +@XmlAccessorType(XmlAccessType.FIELD) +public class ResequenceDefinition extends ExpressionNode { + @XmlElement(name = "batch-config", required = false) private BatchResequencerConfig batchConfig; - // Binding annotation at setter + @XmlElement(name = "stream-config", required = false) private StreamResequencerConfig streamConfig; - @XmlTransient - private List<Expression> expressionList; public ResequenceDefinition() { - this(null); - } - - public ResequenceDefinition(List<Expression> expressions) { - this.expressionList = expressions; - this.batch(); } @Override public String getShortName() { return "resequence"; } - + // Fluent API // ------------------------------------------------------------------------- /** @@ -115,17 +101,6 @@ public class ResequenceDefinition extend } /** - * Sets the expression to use for reordering - * - * @param expression the expression - * @return the builder - */ - public ResequenceDefinition expression(ExpressionDefinition expression) { - expressions.add(expression); - return this; - } - - /** * Sets the timeout * @param timeout timeout in millis * @return the builder @@ -145,9 +120,13 @@ public class ResequenceDefinition extend * @return the builder */ public ResequenceDefinition size(int batchSize) { - if (batchConfig == null) { + if (streamConfig != null) { throw new IllegalStateException("size() only supported for batch resequencer"); } + // initialize batch mode as its default mode + if (batchConfig == null) { + batch(); + } batchConfig.setBatchSize(batchSize); return this; } @@ -172,9 +151,13 @@ public class ResequenceDefinition extend * @return the builder */ public ResequenceDefinition allowDuplicates() { - if (batchConfig == null) { + if (streamConfig != null) { throw new IllegalStateException("allowDuplicates() only supported for batch resequencer"); } + // initialize batch mode as its default mode + if (batchConfig == null) { + batch(); + } batchConfig.setAllowDuplicates(true); return this; } @@ -188,9 +171,13 @@ public class ResequenceDefinition extend * @return the builder */ public ResequenceDefinition reverse() { - if (batchConfig == null) { + if (streamConfig != null) { throw new IllegalStateException("reverse() only supported for batch resequencer"); } + // initialize batch mode as its default mode + if (batchConfig == null) { + batch(); + } batchConfig.setReverse(true); return this; } @@ -211,65 +198,45 @@ public class ResequenceDefinition extend @Override public String toString() { - return "Resequencer[" + getExpressions() + " -> " + getOutputs() + "]"; + return "Resequencer[" + getExpression() + " -> " + getOutputs() + "]"; } @Override public String getLabel() { - return ExpressionDefinition.getLabel(getExpressions()); - } - - public List<ExpressionDefinition> getExpressions() { - return expressions; - } - - public List<Expression> getExpressionList() { - return expressionList; - } - - public List<ProcessorDefinition> getOutputs() { - return outputs; - } - - public void setOutputs(List<ProcessorDefinition> outputs) { - this.outputs = outputs; + return "Resequencer[" + super.getLabel() + "]"; } public BatchResequencerConfig getBatchConfig() { return batchConfig; } - public BatchResequencerConfig getBatchConfig(BatchResequencerConfig defaultConfig) { - return batchConfig; - } - public StreamResequencerConfig getStreamConfig() { return streamConfig; } - @XmlElement(name = "batch-config", required = false) public void setBatchConfig(BatchResequencerConfig batchConfig) { - batch(batchConfig); + this.batchConfig = batchConfig; } - @XmlElement(name = "stream-config", required = false) public void setStreamConfig(StreamResequencerConfig streamConfig) { - stream(streamConfig); + this.streamConfig = streamConfig; } @Override public Processor createProcessor(RouteContext routeContext) throws Exception { - if (batchConfig != null) { - return createBatchResequencer(routeContext, batchConfig); - } else { - // streamConfig should be non-null if batchConfig is null + if (streamConfig != null) { return createStreamResequencer(routeContext, streamConfig); + } else { + if (batchConfig == null) { + // default as batch mode + batch(); + } + return createBatchResequencer(routeContext, batchConfig); } } /** - * Creates a batch {@link Resequencer} instance applying the given - * <code>config</code>. + * Creates a batch {@link Resequencer} instance applying the given <code>config</code>. * * @param routeContext route context. * @param config batch resequencer configuration. @@ -277,10 +244,14 @@ public class ResequenceDefinition extend * @throws Exception can be thrown */ protected Resequencer createBatchResequencer(RouteContext routeContext, - BatchResequencerConfig config) throws Exception { - + BatchResequencerConfig config) throws Exception { Processor processor = this.createChildProcessor(routeContext, true); - Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, resolveExpressionList(routeContext), + Expression expression = getExpression().createExpression(routeContext); + + ObjectHelper.notNull(config, "config", this); + ObjectHelper.notNull(expression, "expression", this); + + Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), processor, expression, config.isAllowDuplicates(), config.isReverse()); resequencer.setBatchSize(config.getBatchSize()); resequencer.setBatchTimeout(config.getBatchTimeout()); @@ -288,36 +259,28 @@ public class ResequenceDefinition extend } /** - * Creates a {@link StreamResequencer} instance applying the given - * <code>config</code>. + * Creates a {@link StreamResequencer} instance applying the given <code>config</code>. * * @param routeContext route context. * @param config stream resequencer configuration. * @return the configured stream resequencer. * @throws Exception can be thrwon */ - protected StreamResequencer createStreamResequencer(RouteContext routeContext, - StreamResequencerConfig config) throws Exception { - - config.getComparator().setExpressions(resolveExpressionList(routeContext)); + protected StreamResequencer createStreamResequencer(RouteContext routeContext, + StreamResequencerConfig config) throws Exception { Processor processor = this.createChildProcessor(routeContext, true); - StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), processor, config.getComparator()); + Expression expression = getExpression().createExpression(routeContext); + + ObjectHelper.notNull(config, "config", this); + ObjectHelper.notNull(expression, "expression", this); + + ExpressionResultComparator comparator = config.getComparator(); + comparator.setExpression(expression); + + StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), processor, comparator); resequencer.setTimeout(config.getTimeout()); resequencer.setCapacity(config.getCapacity()); return resequencer; - } - private List<Expression> resolveExpressionList(RouteContext routeContext) { - if (expressionList == null) { - expressionList = new ArrayList<Expression>(); - for (ExpressionDefinition expression : expressions) { - expressionList.add(expression.createExpression(routeContext)); - } - } - if (expressionList.isEmpty()) { - throw new IllegalArgumentException("No expressions configured for: " + this); - } - return expressionList; - } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java?rev=1073278&r1=1073277&r2=1073278&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/BatchResequencerConfig.java Tue Feb 22 09:57:22 2011 @@ -24,46 +24,17 @@ import javax.xml.bind.annotation.XmlRoot /** * Defines the configuration parameters for the batch-processing * {@link org.apache.camel.processor.Resequencer}. Usage example: - * - * <pre> - * from("direct:start").resequence(body()).batch( - * BatchResequencerConfig.getDefault()).to("mock:result") - * </pre> - * is equivalent to - * - * <pre> - * from("direct:start").resequence(body()).batch().to("mock:result") - * </pre> - * - * or - * - * <pre> - * from("direct:start").resequence(body()).to("mock:result") - * </pre> - * - * Custom values for <code>batchSize</code> and <code>batchTimeout</code> - * can be set like in this example: - * - * <pre> - * from("direct:start").resequence(body()).batch( - * new BatchResequencerConfig(300, 400L)).to("mock:result") - * </pre> - * - * @version */ -@XmlRootElement +@XmlRootElement(name = "batch-config") @XmlAccessorType(XmlAccessType.FIELD) public class BatchResequencerConfig { @XmlAttribute - private Integer batchSize; // optional XML attribute requires wrapper object - + private Integer batchSize; @XmlAttribute - private Long batchTimeout; // optional XML attribute requires wrapper object - + private Long batchTimeout; @XmlAttribute private Boolean allowDuplicates; - @XmlAttribute private Boolean reverse; @@ -75,26 +46,24 @@ public class BatchResequencerConfig { public BatchResequencerConfig() { this(100, 1000L); } - + /** * Creates a new {@link BatchResequencerConfig} instance using the given * values for <code>batchSize</code> and <code>batchTimeout</code>. - * - * @param batchSize - * size of the batch to be re-ordered. - * @param batchTimeout - * timeout for collecting elements to be re-ordered. + * + * @param batchSize size of the batch to be re-ordered. + * @param batchTimeout timeout for collecting elements to be re-ordered. */ public BatchResequencerConfig(int batchSize, long batchTimeout) { this.batchSize = batchSize; this.batchTimeout = batchTimeout; } - + /** * Returns a new {@link BatchResequencerConfig} instance using default * values for <code>batchSize</code> (100) and <code>batchTimeout</code> * (1000L). - * + * * @return a default {@link BatchResequencerConfig}. */ public static BatchResequencerConfig getDefault() { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java?rev=1073278&r1=1073277&r2=1073278&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/config/StreamResequencerConfig.java Tue Feb 22 09:57:22 2011 @@ -27,39 +27,17 @@ import org.apache.camel.processor.resequ /** * Defines the configuration parameters for the {@link org.apache.camel.processor.StreamResequencer}. - * Usage example: - * - * <pre> - * from("direct:start").resequencer(header("seqnum")).stream( - * StreamResequencerConfig.getDefault()).to("mock:result") - * </pre> - * - * is equivalent to - * - * <pre> - * from("direct:start").resequencer(header("seqnum")).stream().to("mock:result") - * </pre> - * - * Custom values for <code>capacity</code> and <code>timeout</code> can be - * set like in this example: - * - * <pre> - * from("direct:start").resequencer(header("seqnum")).stream( - * new StreamResequencerConfig(300, 400L)).to("mock:result") - * </pre> - * + * * @version */ -@XmlRootElement +@XmlRootElement(name = "stream-config") @XmlAccessorType(XmlAccessType.FIELD) public class StreamResequencerConfig { @XmlAttribute - private Integer capacity; // optional XML attribute requires wrapper object - + private Integer capacity; @XmlAttribute - private Long timeout; // optional XML attribute requires wrapper object - + private Long timeout; @XmlTransient private ExpressionResultComparator comparator; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=1073278&r1=1073277&r2=1073278&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java Tue Feb 22 09:57:22 2011 @@ -42,9 +42,9 @@ public class Resequencer extends BatchPr this(camelContext, processor, createSet(expression, false, false)); } - public Resequencer(CamelContext camelContext, Processor processor, List<Expression> expressions, + public Resequencer(CamelContext camelContext, Processor processor, Expression expression, boolean allowDuplicates, boolean reverse) { - this(camelContext, processor, createSet(expressions, allowDuplicates, reverse)); + this(camelContext, processor, createSet(expression, allowDuplicates, reverse)); } public Resequencer(CamelContext camelContext, Processor processor, Set<Exchange> collection) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=1073278&r1=1073277&r2=1073278&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java Tue Feb 22 09:57:22 2011 @@ -47,7 +47,7 @@ import org.apache.camel.util.ServiceHelp * successor is known. For example a message with the sequence number 3 has a * predecessor message with the sequence number 2 and a successor message with * the sequence number 4. The message sequence 2,3,5 has a gap because the - * sucessor of 3 is missing. The resequencer therefore has to retain message 5 + * successor of 3 is missing. The resequencer therefore has to retain message 5 * until message 4 arrives (or a timeout occurs). * <p> * Instances of this class poll for {@link Exchange}s from a given Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java?rev=1073278&r1=1073277&r2=1073278&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/DefaultExchangeComparator.java Tue Feb 22 09:57:22 2011 @@ -16,20 +16,13 @@ */ package org.apache.camel.processor.resequencer; -import java.util.List; - import org.apache.camel.Exchange; import org.apache.camel.Expression; /** * Compares elements of an {@link Exchange} sequence by comparing * <code>long</code> values returned by this comaprator's - * <code>expression</code>. The expression is set during route definition - * e.g. - * - * <pre> - * ...resequencer(header("seqnum")).stream()... - * </pre> + * <code>expression</code>. * * @version */ @@ -37,23 +30,10 @@ public class DefaultExchangeComparator i private Expression expression; - public Expression getExpression() { - return expression; - } - public void setExpression(Expression expression) { this.expression = expression; } - public void setExpressions(List<Expression> expressions) { - if (expressions.isEmpty()) { - throw new IllegalArgumentException("Expression required to resolve sequence number"); - } else if (expressions.size() > 1) { - throw new IllegalArgumentException("More than one expression currently not supported"); - } - expression = expressions.get(0); - } - public boolean predecessor(Exchange o1, Exchange o2) { long n1 = getSequenceNumber(o1); long n2 = getSequenceNumber(o2); @@ -75,5 +55,4 @@ public class DefaultExchangeComparator i private long getSequenceNumber(Exchange exchange) { return expression.evaluate(exchange, Long.class); } - } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java?rev=1073278&r1=1073277&r2=1073278&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/ExpressionResultComparator.java Tue Feb 22 09:57:22 2011 @@ -16,8 +16,6 @@ */ package org.apache.camel.processor.resequencer; -import java.util.List; - import org.apache.camel.Exchange; import org.apache.camel.Expression; @@ -30,10 +28,10 @@ import org.apache.camel.Expression; public interface ExpressionResultComparator extends SequenceElementComparator<Exchange> { /** - * Sets the list expressions used for comparing {@link Exchange}s. + * Set the expression sed for comparing {@link Exchange}s. * - * @param expressions a list of {@link Expression} objects. + * @param expression the expression */ - void setExpressions(List<Expression> expressions); + void setExpression(Expression expression); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java?rev=1073278&r1=1073277&r2=1073278&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/resequencer/SequenceElementComparator.java Tue Feb 22 09:57:22 2011 @@ -41,7 +41,7 @@ public interface SequenceElementComparat * * @param o1 a sequence element. * @param o2 a sequence element. - * @return true if its an immediate sucessor + * @return true if its an immediate successor */ boolean successor(E o1, E o2);