Author: boday Date: Fri Jul 6 22:13:51 2012 New Revision: 1358452 URL: http://svn.apache.org/viewvc?rev=1358452&view=rev Log: CAMEL-3211 added basic "pollMultiple" support to the pollEnrich EIP
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java?rev=1358452&r1=1358451&r2=1358452&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java Fri Jul 6 22:13:51 2012 @@ -48,14 +48,17 @@ public class PollEnrichDefinition extend private String aggregationStrategyRef; @XmlTransient private AggregationStrategy aggregationStrategy; + @XmlAttribute + private Boolean pollMultiple; public PollEnrichDefinition() { } - public PollEnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri, long timeout) { + public PollEnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri, long timeout, Boolean pollMultiple) { this.aggregationStrategy = aggregationStrategy; this.resourceUri = resourceUri; this.timeout = timeout; + this.pollMultiple = pollMultiple; } @Override @@ -93,10 +96,10 @@ public class PollEnrichDefinition extend PollEnricher enricher; if (timeout != null) { - enricher = new PollEnricher(null, endpoint.createPollingConsumer(), timeout); + enricher = new PollEnricher(null, endpoint.createPollingConsumer(), timeout, pollMultiple); } else { // if no timeout then we should block, and there use a negative timeout - enricher = new PollEnricher(null, endpoint.createPollingConsumer(), -1); + enricher = new PollEnricher(null, endpoint.createPollingConsumer(), -1, pollMultiple); } if (aggregationStrategyRef != null) { @@ -150,4 +153,12 @@ public class PollEnrichDefinition extend public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { this.aggregationStrategy = aggregationStrategy; } + + public Boolean isPollMultiple() { + return pollMultiple; + } + + public void setPollMultiple(Boolean pollMultiple) { + this.pollMultiple = pollMultiple; + } } \ No newline at end of file Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=1358452&r1=1358451&r2=1358452&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Fri Jul 6 22:13:51 2012 @@ -2823,7 +2823,52 @@ public abstract class ProcessorDefinitio */ @SuppressWarnings("unchecked") public Type pollEnrich(String resourceUri) { - addOutput(new PollEnrichDefinition(null, resourceUri, -1)); + addOutput(new PollEnrichDefinition(null, resourceUri, -1, false)); + return (Type) this; + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * This method will not wait for data to become available, use the method with an explicit timeout + * if you want to wait for data for a period of time from the resourceUri. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param pollMultiple if enabled will poll for all Exchanges available on the endpoint + * @return the builder + * @see org.apache.camel.processor.PollEnricher + */ + @SuppressWarnings("unchecked") + public Type pollEnrich(String resourceUri, boolean pollMultiple) { + addOutput(new PollEnrichDefinition(null, resourceUri, 0, pollMultiple)); + return (Type) this; + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * This method will <tt>block</tt> until data is available, use the method with timeout if you do not + * want to risk waiting a long time before data is available from the resourceUri. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param timeout timeout in millis to wait at most for data to be available. + * @param pollMultiple if enabled will poll for all Exchanges available on the endpoint + * @return the builder + * @see org.apache.camel.processor.PollEnricher + */ + @SuppressWarnings("unchecked") + public Type pollEnrich(String resourceUri, long timeout, boolean pollMultiple) { + addOutput(new PollEnrichDefinition(null, resourceUri, timeout, pollMultiple)); return (Type) this; } @@ -2845,7 +2890,7 @@ public abstract class ProcessorDefinitio */ @SuppressWarnings("unchecked") public Type pollEnrich(String resourceUri, AggregationStrategy aggregationStrategy) { - addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, -1)); + addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, -1, false)); return (Type) this; } @@ -2869,7 +2914,7 @@ public abstract class ProcessorDefinitio */ @SuppressWarnings("unchecked") public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy) { - addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout)); + addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout, false)); return (Type) this; } @@ -2892,7 +2937,7 @@ public abstract class ProcessorDefinitio */ @SuppressWarnings("unchecked") public Type pollEnrich(String resourceUri, long timeout) { - addOutput(new PollEnrichDefinition(null, resourceUri, timeout)); + addOutput(new PollEnrichDefinition(null, resourceUri, timeout, false)); return (Type) this; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?rev=1358452&r1=1358451&r2=1358452&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java Fri Jul 6 22:13:51 2012 @@ -16,6 +16,9 @@ */ package org.apache.camel.processor; +import java.util.ArrayList; +import java.util.List; + import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.PollingConsumer; @@ -47,6 +50,7 @@ public class PollEnricher extends Servic private AggregationStrategy aggregationStrategy; private PollingConsumer consumer; private long timeout; + private Boolean pollMultiple; /** * Creates a new {@link PollEnricher}. The default aggregation strategy is to @@ -57,7 +61,7 @@ public class PollEnricher extends Servic * @param consumer consumer to resource endpoint. */ public PollEnricher(PollingConsumer consumer) { - this(defaultAggregationStrategy(), consumer, 0); + this(defaultAggregationStrategy(), consumer, 0, false); } /** @@ -74,6 +78,21 @@ public class PollEnricher extends Servic } /** + * Creates a new {@link PollEnricher}. + * + * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @param consumer consumer to resource endpoint. + * @param timeout timeout in millis + * @param pollMultiple enabled building a List of multiple exchanges + */ + public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout, Boolean pollMultiple) { + this.aggregationStrategy = aggregationStrategy; + this.consumer = consumer; + this.timeout = timeout; + this.pollMultiple = pollMultiple; + } + + /** * Sets the aggregation strategy for this poll enricher. * * @param aggregationStrategy the aggregationStrategy to set @@ -100,6 +119,10 @@ public class PollEnricher extends Servic this.timeout = timeout; } + public void setPollMultiple(Boolean value) { + this.pollMultiple = value; + } + /** * Enriches the input data (<code>exchange</code>) by first obtaining * additional data from an endpoint represented by an endpoint @@ -113,47 +136,70 @@ public class PollEnricher extends Servic * @param exchange input data. */ public void process(Exchange exchange) throws Exception { + preCheckPoll(exchange); - Exchange resourceExchange; - if (timeout < 0) { - LOG.debug("Consumer receive: {}", consumer); - resourceExchange = consumer.receive(); - } else if (timeout == 0) { - LOG.debug("Consumer receiveNoWait: {}", consumer); - resourceExchange = consumer.receiveNoWait(); - } else { - LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer); - resourceExchange = consumer.receive(timeout); - } + if (pollMultiple != null && pollMultiple) { - if (resourceExchange == null) { - LOG.debug("Consumer received no exchange"); - } else { - LOG.debug("Consumer received: {}", resourceExchange); - } + List<Exchange> exchangeList = new ArrayList<Exchange>(); + Exchange receivedExchange; + while (true) { + if (timeout == 0) { + LOG.debug("Polling Consumer receiveNoWait: {}", consumer); + receivedExchange = consumer.receiveNoWait(); + } else { + LOG.debug("Polling Consumer receive with timeout: {} ms. {}", timeout, consumer); + receivedExchange = consumer.receive(timeout); + } - if (resourceExchange != null && resourceExchange.isFailed()) { - // copy resource exchange onto original exchange (preserving pattern) - copyResultsPreservePattern(exchange, resourceExchange); + if (receivedExchange == null) { + break; + } + exchangeList.add(receivedExchange); + } + exchange.getIn().setBody(exchangeList); } else { - prepareResult(exchange); - - // prepare the exchanges for aggregation - ExchangeHelper.prepareAggregation(exchange, resourceExchange); - // must catch any exception from aggregation - Exchange aggregatedExchange; - try { - aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); - } catch (Throwable e) { - throw new CamelExchangeException("Error occurred during aggregation", exchange, e); + + Exchange resourceExchange; + if (timeout < 0) { + LOG.debug("Consumer receive: {}", consumer); + resourceExchange = consumer.receive(); + } else if (timeout == 0) { + LOG.debug("Consumer receiveNoWait: {}", consumer); + resourceExchange = consumer.receiveNoWait(); + } else { + LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer); + resourceExchange = consumer.receive(timeout); + } + + if (resourceExchange == null) { + LOG.debug("Consumer received no exchange"); + } else { + LOG.debug("Consumer received: {}", resourceExchange); } - if (aggregatedExchange != null) { - // copy aggregation result onto original exchange (preserving pattern) - copyResultsPreservePattern(exchange, aggregatedExchange); - // handover any synchronization - if (resourceExchange != null) { - resourceExchange.handoverCompletions(exchange); + + if (resourceExchange != null && resourceExchange.isFailed()) { + // copy resource exchange onto original exchange (preserving pattern) + copyResultsPreservePattern(exchange, resourceExchange); + } else { + prepareResult(exchange); + + // prepare the exchanges for aggregation + ExchangeHelper.prepareAggregation(exchange, resourceExchange); + // must catch any exception from aggregation + Exchange aggregatedExchange; + try { + aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange); + } catch (Throwable e) { + throw new CamelExchangeException("Error occurred during aggregation", exchange, e); + } + if (aggregatedExchange != null) { + // copy aggregation result onto original exchange (preserving pattern) + copyResultsPreservePattern(exchange, aggregatedExchange); + // handover any synchronization + if (resourceExchange != null) { + resourceExchange.handoverCompletions(exchange); + } } } } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java?rev=1358452&r1=1358451&r2=1358452&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java Fri Jul 6 22:13:51 2012 @@ -16,6 +16,9 @@ */ package org.apache.camel.processor.enricher; +import java.util.ArrayList; +import java.util.List; + import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; @@ -116,6 +119,41 @@ public class PollEnricherTest extends Co assertNull(exchange.getException()); } + public void testPollEnrichMultipleDefaultNoWait() throws InterruptedException { + + mock.expectedMessageCount(1); + template.sendBody("seda:foo5", "msg1"); + template.sendBody("seda:foo5", "msg2"); + template.sendBody("seda:enricher-test-5", "test"); + Thread.sleep(100); + template.sendBody("seda:foo5", "msg3"); + template.sendBody("seda:foo5", "msg4"); + + List<Exchange> polledExchanges = mock.getExchanges().get(0).getIn().getBody(List.class); + assertEquals(2, polledExchanges.size()); + + mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo5"); + mock.assertIsSatisfied(0); + } + + public void testPollEnrichMultipleExplicitTimeout() throws InterruptedException { + + mock.expectedMessageCount(1); + template.sendBody("seda:foo6", "msg1"); + template.sendBody("seda:foo6", "msg2"); + template.sendBody("seda:enricher-test-6", "test"); + template.sendBody("seda:foo6", "msg3"); + template.sendBody("seda:foo6", "msg4"); + + Thread.sleep(500); + + List<Exchange> polledExchanges = mock.getExchanges().get(0).getIn().getBody(List.class); + assertEquals(4, polledExchanges.size()); + + mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo6"); + mock.assertIsSatisfied(0); + } + protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { @@ -141,6 +179,18 @@ public class PollEnricherTest extends Co from("direct:enricher-test-4") .pollEnrich("seda:foo4", aggregationStrategy); + + // ------------------------------------------------------------- + // Poll Multiple routes + // ------------------------------------------------------------- + + from("seda:enricher-test-5") + .pollEnrich("seda:foo5", true) + .to("mock:mock"); + + from("seda:enricher-test-6") + .pollEnrich("seda:foo6", 200, true) + .to("mock:mock"); } }; } Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml?rev=1358452&r1=1358451&r2=1358452&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml Fri Jul 6 22:13:51 2012 @@ -48,6 +48,19 @@ <pollEnrich uri="seda:foo4" strategyRef="sampleAggregator"/> <to uri="mock:mock"/> </route> + + <route> + <from uri="seda:enricher-test-5"/> + <pollEnrich uri="seda:foo5" pollMultiple="true"/> + <to uri="mock:mock"/> + </route> + + <route> + <from uri="seda:enricher-test-6"/> + <pollEnrich uri="seda:foo6" timeout="200" pollMultiple="true"/> + <to uri="mock:mock"/> + </route> + </camelContext> <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/> Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml?rev=1358452&r1=1358451&r2=1358452&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml Fri Jul 6 22:13:51 2012 @@ -28,6 +28,8 @@ <endpoint id="foo2" uri="seda:foo2"/> <endpoint id="foo3" uri="seda:foo3"/> <endpoint id="foo4" uri="seda:foo4"/> + <endpoint id="foo5" uri="seda:foo5"/> + <endpoint id="foo6" uri="seda:foo6"/> <!-- START SNIPPET: e1 --> <route> @@ -54,6 +56,19 @@ <pollEnrich ref="foo4" strategyRef="sampleAggregator"/> <to uri="mock:mock"/> </route> + + <route> + <from uri="seda:enricher-test-5"/> + <pollEnrich ref="foo5" pollMultiple="true"/> + <to uri="mock:mock"/> + </route> + + <route> + <from uri="seda:enricher-test-6"/> + <pollEnrich ref="foo6" timeout="200" pollMultiple="true"/> + <to uri="mock:mock"/> + </route> + </camelContext> <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/>