Author: boday Date: Thu Jul 12 18:22:08 2012 New Revision: 1360843 URL: http://svn.apache.org/viewvc?rev=1360843&view=rev Log: CAMEL-3211 removed pollEnrich() pollMultiple feature until a more comprehensive solution is defined
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SAbstractDefinition.scala camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/builder/RouteBuilder.scala camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SPollEnricherTest.scala camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java?rev=1360843&r1=1360842&r2=1360843&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java Thu Jul 12 18:22:08 2012 @@ -48,17 +48,14 @@ public class PollEnrichDefinition extend private String aggregationStrategyRef; @XmlTransient private AggregationStrategy aggregationStrategy; - @XmlAttribute - private Boolean pollMultiple; public PollEnrichDefinition() { } - public PollEnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri, long timeout, Boolean pollMultiple) { + public PollEnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri, long timeout) { this.aggregationStrategy = aggregationStrategy; this.resourceUri = resourceUri; this.timeout = timeout; - this.pollMultiple = pollMultiple; } @Override @@ -96,10 +93,10 @@ public class PollEnrichDefinition extend PollEnricher enricher; if (timeout != null) { - enricher = new PollEnricher(null, endpoint.createPollingConsumer(), timeout, pollMultiple); + enricher = new PollEnricher(null, endpoint.createPollingConsumer(), timeout); } else { // if no timeout then we should block, and there use a negative timeout - enricher = new PollEnricher(null, endpoint.createPollingConsumer(), -1, pollMultiple); + enricher = new PollEnricher(null, endpoint.createPollingConsumer(), -1); } if (aggregationStrategyRef != null) { @@ -153,12 +150,4 @@ public class PollEnrichDefinition extend public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { this.aggregationStrategy = aggregationStrategy; } - - public Boolean isPollMultiple() { - return pollMultiple; - } - - public void setPollMultiple(Boolean pollMultiple) { - this.pollMultiple = pollMultiple; - } } \ No newline at end of file Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=1360843&r1=1360842&r2=1360843&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Thu Jul 12 18:22:08 2012 @@ -2823,52 +2823,7 @@ public abstract class ProcessorDefinitio */ @SuppressWarnings("unchecked") public Type pollEnrich(String resourceUri) { - addOutput(new PollEnrichDefinition(null, resourceUri, -1, false)); - return (Type) this; - } - - /** - * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> - * enriches an exchange with additional data obtained from a <code>resourceUri</code> - * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. - * <p/> - * The difference between this and {@link #enrich(String)} is that this uses a consumer - * to obtain the additional data, where as enrich uses a producer. - * <p/> - * This method will not wait for data to become available, use the method with an explicit timeout - * if you want to wait for data for a period of time from the resourceUri. - * - * @param resourceUri URI of resource endpoint for obtaining additional data. - * @param pollMultiple if enabled will poll for all Exchanges available on the endpoint - * @return the builder - * @see org.apache.camel.processor.PollEnricher - */ - @SuppressWarnings("unchecked") - public Type pollEnrich(String resourceUri, boolean pollMultiple) { - addOutput(new PollEnrichDefinition(null, resourceUri, 0, pollMultiple)); - return (Type) this; - } - - /** - * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> - * enriches an exchange with additional data obtained from a <code>resourceUri</code> - * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. - * <p/> - * The difference between this and {@link #enrich(String)} is that this uses a consumer - * to obtain the additional data, where as enrich uses a producer. - * <p/> - * This method will <tt>block</tt> until data is available, use the method with timeout if you do not - * want to risk waiting a long time before data is available from the resourceUri. - * - * @param resourceUri URI of resource endpoint for obtaining additional data. - * @param timeout timeout in millis to wait at most for data to be available. - * @param pollMultiple if enabled will poll for all Exchanges available on the endpoint - * @return the builder - * @see org.apache.camel.processor.PollEnricher - */ - @SuppressWarnings("unchecked") - public Type pollEnrich(String resourceUri, long timeout, boolean pollMultiple) { - addOutput(new PollEnrichDefinition(null, resourceUri, timeout, pollMultiple)); + addOutput(new PollEnrichDefinition(null, resourceUri, -1)); return (Type) this; } @@ -2890,7 +2845,7 @@ public abstract class ProcessorDefinitio */ @SuppressWarnings("unchecked") public Type pollEnrich(String resourceUri, AggregationStrategy aggregationStrategy) { - addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, -1, false)); + addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, -1)); return (Type) this; } @@ -2914,7 +2869,7 @@ public abstract class ProcessorDefinitio */ @SuppressWarnings("unchecked") public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy) { - addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout, false)); + addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout)); return (Type) this; } @@ -2937,7 +2892,7 @@ public abstract class ProcessorDefinitio */ @SuppressWarnings("unchecked") public Type pollEnrich(String resourceUri, long timeout) { - addOutput(new PollEnrichDefinition(null, resourceUri, timeout, false)); + addOutput(new PollEnrichDefinition(null, resourceUri, timeout)); return (Type) this; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?rev=1360843&r1=1360842&r2=1360843&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java Thu Jul 12 18:22:08 2012 @@ -16,9 +16,6 @@ */ package org.apache.camel.processor; -import java.util.ArrayList; -import java.util.List; - import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.PollingConsumer; @@ -50,7 +47,6 @@ public class PollEnricher extends Servic private AggregationStrategy aggregationStrategy; private PollingConsumer consumer; private long timeout; - private Boolean pollMultiple; /** * Creates a new {@link PollEnricher}. The default aggregation strategy is to @@ -61,7 +57,7 @@ public class PollEnricher extends Servic * @param consumer consumer to resource endpoint. */ public PollEnricher(PollingConsumer consumer) { - this(defaultAggregationStrategy(), consumer, 0, false); + this(defaultAggregationStrategy(), consumer, 0); } /** @@ -78,21 +74,6 @@ public class PollEnricher extends Servic } /** - * Creates a new {@link PollEnricher}. - * - * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. - * @param consumer consumer to resource endpoint. - * @param timeout timeout in millis - * @param pollMultiple enabled building a List of multiple exchanges - */ - public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout, Boolean pollMultiple) { - this.aggregationStrategy = aggregationStrategy; - this.consumer = consumer; - this.timeout = timeout; - this.pollMultiple = pollMultiple; - } - - /** * Sets the aggregation strategy for this poll enricher. * * @param aggregationStrategy the aggregationStrategy to set @@ -119,10 +100,6 @@ public class PollEnricher extends Servic this.timeout = timeout; } - public void setPollMultiple(Boolean value) { - this.pollMultiple = value; - } - /** * Enriches the input data (<code>exchange</code>) by first obtaining * additional data from an endpoint represented by an endpoint @@ -136,70 +113,47 @@ public class PollEnricher extends Servic * @param exchange input data. */ public void process(Exchange exchange) throws Exception { - preCheckPoll(exchange); - if (pollMultiple != null && pollMultiple) { + Exchange resourceExchange; + if (timeout < 0) { + LOG.debug("Consumer receive: {}", consumer); + resourceExchange = consumer.receive(); + } else if (timeout == 0) { + LOG.debug("Consumer receiveNoWait: {}", consumer); + resourceExchange = consumer.receiveNoWait(); + } else { + LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer); + resourceExchange = consumer.receive(timeout); + } - List<Exchange> exchangeList = new ArrayList<Exchange>(); - Exchange receivedExchange; - while (true) { - if (timeout == 0) { - LOG.debug("Polling Consumer receiveNoWait: {}", consumer); - receivedExchange = consumer.receiveNoWait(); - } else { - LOG.debug("Polling Consumer receive with timeout: {} ms. {}", timeout, consumer); - receivedExchange = consumer.receive(timeout); - } + if (resourceExchange == null) { + LOG.debug("Consumer received no exchange"); + } else { + LOG.debug("Consumer received: {}", resourceExchange); + } - if (receivedExchange == null) { - break; - } - exchangeList.add(receivedExchange); - } - exchange.getIn().setBody(exchangeList); + if (resourceExchange != null && resourceExchange.isFailed()) { + // copy resource exchange onto original exchange (preserving pattern) + copyResultsPreservePattern(exchange, resourceExchange); } else { - - Exchange resourceExchange; - if (timeout < 0) { - LOG.debug("Consumer receive: {}", consumer); - resourceExchange = consumer.receive(); - } else if (timeout == 0) { - LOG.debug("Consumer receiveNoWait: {}", consumer); - resourceExchange = consumer.receiveNoWait(); - } else { - LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer); - resourceExchange = consumer.receive(timeout); - } - - if (resourceExchange == null) { - LOG.debug("Consumer received no exchange"); - } else { - LOG.debug("Consumer received: {}", resourceExchange); - } + prepareResult(exchange); - if (resourceExchange != null && resourceExchange.isFailed()) { - // copy resource exchange onto original exchange (preserving pattern) - copyResultsPreservePattern(exchange, resourceExchange); - } else { - prepareResult(exchange); - - // prepare the exchanges for aggregation - ExchangeHelper.prepareAggregation(exchange, resourceExchange); - // must catch any exception from aggregation - Exchange aggregatedExchange; - try { - aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); - } catch (Throwable e) { - throw new CamelExchangeException("Error occurred during aggregation", exchange, e); - } - if (aggregatedExchange != null) { - // copy aggregation result onto original exchange (preserving pattern) - copyResultsPreservePattern(exchange, aggregatedExchange); - // handover any synchronization - if (resourceExchange != null) { - resourceExchange.handoverCompletions(exchange); - } + // prepare the exchanges for aggregation + ExchangeHelper.prepareAggregation(exchange, resourceExchange); + // must catch any exception from aggregation + Exchange aggregatedExchange; + try { + aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); + } catch (Throwable e) { + throw new CamelExchangeException("Error occurred during aggregation", exchange, e); + } + if (aggregatedExchange != null) { + // copy aggregation result onto original exchange (preserving pattern) + copyResultsPreservePattern(exchange, aggregatedExchange); + // handover any synchronization + if (resourceExchange != null) { + resourceExchange.handoverCompletions(exchange); } } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java?rev=1360843&r1=1360842&r2=1360843&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java Thu Jul 12 18:22:08 2012 @@ -118,41 +118,6 @@ public class PollEnricherTest extends Co assertNull(exchange.getException()); } - public void testPollEnrichMultipleDefaultNoWait() throws InterruptedException { - - mock.expectedMessageCount(1); - template.sendBody("seda:foo5", "msg1"); - template.sendBody("seda:foo5", "msg2"); - template.sendBody("seda:enricher-test-5", "test"); - Thread.sleep(100); - template.sendBody("seda:foo5", "msg3"); - template.sendBody("seda:foo5", "msg4"); - - List<?> polledExchanges = mock.getExchanges().get(0).getIn().getBody(List.class); - assertEquals(2, polledExchanges.size()); - - mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo5"); - mock.assertIsSatisfied(0); - } - - public void testPollEnrichMultipleExplicitTimeout() throws InterruptedException { - - mock.expectedMessageCount(1); - template.sendBody("seda:foo6", "msg1"); - template.sendBody("seda:foo6", "msg2"); - template.sendBody("seda:enricher-test-6", "test"); - template.sendBody("seda:foo6", "msg3"); - template.sendBody("seda:foo6", "msg4"); - - Thread.sleep(500); - - List<?> polledExchanges = mock.getExchanges().get(0).getIn().getBody(List.class); - assertEquals(4, polledExchanges.size()); - - mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo6"); - mock.assertIsSatisfied(0); - } - protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { @@ -178,18 +143,6 @@ public class PollEnricherTest extends Co from("direct:enricher-test-4") .pollEnrich("seda:foo4", aggregationStrategy); - - // ------------------------------------------------------------- - // Poll Multiple routes - // ------------------------------------------------------------- - - from("seda:enricher-test-5") - .pollEnrich("seda:foo5", true) - .to("mock:mock"); - - from("seda:enricher-test-6") - .pollEnrich("seda:foo6", 200, true) - .to("mock:mock"); } }; } Modified: camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala?rev=1360843&r1=1360842&r2=1360843&view=diff ============================================================================== --- camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala (original) +++ camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/DSL.scala Thu Jul 12 18:22:08 2012 @@ -69,8 +69,7 @@ trait DSL { def pipeline : SPipelineDefinition def policy(policy: Policy) : DSL - def pollEnrich(uri: String, strategy: AggregationStrategy = null, timeout: Long = -1) : DSL - def pollEnrich(uri: String, timeout: Long, pollMultiple: Boolean) : DSL + def pollEnrich(uri: String, strategy: AggregationStrategy = null, timeout: Long = 0) : DSL def process(function: Exchange => Unit) : DSL def process(processor: Processor) : DSL Modified: camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SAbstractDefinition.scala URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SAbstractDefinition.scala?rev=1360843&r1=1360842&r2=1360843&view=diff ============================================================================== --- camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SAbstractDefinition.scala (original) +++ camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/SAbstractDefinition.scala Thu Jul 12 18:22:08 2012 @@ -109,8 +109,6 @@ abstract class SAbstractDefinition[P <: def policy(policy: Policy) = wrap(target.policy(policy)) def pollEnrich(uri: String, strategy: AggregationStrategy = null, timeout: Long = -1) = wrap(target.pollEnrich(uri, timeout, strategy)) - def pollEnrich(uri: String, timeout: Long, pollMultiple: Boolean) = - wrap(target.pollEnrich(uri, timeout, pollMultiple)) def process(function: Exchange => Unit) = wrap(target.process(new ScalaProcessor(function))) def process(processor: Processor) = wrap(target.process(processor)) Modified: camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/builder/RouteBuilder.scala URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/builder/RouteBuilder.scala?rev=1360843&r1=1360842&r2=1360843&view=diff ============================================================================== --- camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/builder/RouteBuilder.scala (original) +++ camel/trunk/components/camel-scala/src/main/scala/org/apache/camel/scala/dsl/builder/RouteBuilder.scala Thu Jul 12 18:22:08 2012 @@ -156,8 +156,7 @@ class RouteBuilder extends Preamble with def otherwise = stack.top.otherwise def pipeline = stack.top.pipeline - def pollEnrich(uri: String, strategy: AggregationStrategy = null, timeout: Long = -1) = stack.top.pollEnrich(uri, strategy, timeout) - def pollEnrich(uri: String, timeout: Long, pollMultiple: Boolean) = stack.top.pollEnrich(uri, timeout, pollMultiple) + def pollEnrich(uri: String, strategy: AggregationStrategy = null, timeout: Long = 0) = stack.top.pollEnrich(uri, strategy, timeout) def policy(policy: Policy) = stack.top.policy(policy) def process(function: Exchange => Unit) = stack.top.process(function) def process(processor: Processor) = stack.top.process(processor) Modified: camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SPollEnricherTest.scala URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SPollEnricherTest.scala?rev=1360843&r1=1360842&r2=1360843&view=diff ============================================================================== --- camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SPollEnricherTest.scala (original) +++ camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/SPollEnricherTest.scala Thu Jul 12 18:22:08 2012 @@ -40,16 +40,5 @@ class SPollEnricherTest extends PollEnri } "direct:enricher-test-4" pollEnrich("seda:foo4", strategy) - - "seda:enricher-test-5" ==> { - pollEnrich("seda:foo5", -1, pollMultiple = true) - to("mock:mock") - } - - "seda:enricher-test-6" ==> { - pollEnrich("seda:foo6", 200, pollMultiple = true) - to("mock:mock") - } - } } Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml?rev=1360843&r1=1360842&r2=1360843&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml Thu Jul 12 18:22:08 2012 @@ -48,19 +48,6 @@ <pollEnrich uri="seda:foo4" strategyRef="sampleAggregator"/> <to uri="mock:mock"/> </route> - - <route> - <from uri="seda:enricher-test-5"/> - <pollEnrich uri="seda:foo5" pollMultiple="true"/> - <to uri="mock:mock"/> - </route> - - <route> - <from uri="seda:enricher-test-6"/> - <pollEnrich uri="seda:foo6" timeout="200" pollMultiple="true"/> - <to uri="mock:mock"/> - </route> - </camelContext> <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/> Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml?rev=1360843&r1=1360842&r2=1360843&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml Thu Jul 12 18:22:08 2012 @@ -28,8 +28,6 @@ <endpoint id="foo2" uri="seda:foo2"/> <endpoint id="foo3" uri="seda:foo3"/> <endpoint id="foo4" uri="seda:foo4"/> - <endpoint id="foo5" uri="seda:foo5"/> - <endpoint id="foo6" uri="seda:foo6"/> <!-- START SNIPPET: e1 --> <route> @@ -56,19 +54,6 @@ <pollEnrich ref="foo4" strategyRef="sampleAggregator"/> <to uri="mock:mock"/> </route> - - <route> - <from uri="seda:enricher-test-5"/> - <pollEnrich ref="foo5" pollMultiple="true"/> - <to uri="mock:mock"/> - </route> - - <route> - <from uri="seda:enricher-test-6"/> - <pollEnrich ref="foo6" timeout="200" pollMultiple="true"/> - <to uri="mock:mock"/> - </route> - </camelContext> <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>