Author: boday
Date: Fri Jul  6 22:13:51 2012
New Revision: 1358452

URL: http://svn.apache.org/viewvc?rev=1358452&view=rev
Log:
CAMEL-3211 added basic "pollMultiple" support to the pollEnrich EIP

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java
    
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
    
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java?rev=1358452&r1=1358451&r2=1358452&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
 Fri Jul  6 22:13:51 2012
@@ -48,14 +48,17 @@ public class PollEnrichDefinition extend
     private String aggregationStrategyRef;
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
+    @XmlAttribute
+    private Boolean pollMultiple;
 
     public PollEnrichDefinition() {
     }
 
-    public PollEnrichDefinition(AggregationStrategy aggregationStrategy, 
String resourceUri, long timeout) {
+    public PollEnrichDefinition(AggregationStrategy aggregationStrategy, 
String resourceUri, long timeout, Boolean pollMultiple) {
         this.aggregationStrategy = aggregationStrategy;
         this.resourceUri = resourceUri;
         this.timeout = timeout;
+        this.pollMultiple = pollMultiple;
     }
 
     @Override
@@ -93,10 +96,10 @@ public class PollEnrichDefinition extend
 
         PollEnricher enricher;
         if (timeout != null) {
-            enricher = new PollEnricher(null, 
endpoint.createPollingConsumer(), timeout);
+            enricher = new PollEnricher(null, 
endpoint.createPollingConsumer(), timeout, pollMultiple);
         } else {
             // if no timeout then we should block, and there use a negative 
timeout
-            enricher = new PollEnricher(null, 
endpoint.createPollingConsumer(), -1);
+            enricher = new PollEnricher(null, 
endpoint.createPollingConsumer(), -1, pollMultiple);
         }
 
         if (aggregationStrategyRef != null) {
@@ -150,4 +153,12 @@ public class PollEnrichDefinition extend
     public void setAggregationStrategy(AggregationStrategy 
aggregationStrategy) {
         this.aggregationStrategy = aggregationStrategy;
     }
+
+    public Boolean isPollMultiple() {
+        return pollMultiple;
+    }
+
+    public void setPollMultiple(Boolean pollMultiple) {
+        this.pollMultiple = pollMultiple;
+    }
 }
\ No newline at end of file

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=1358452&r1=1358451&r2=1358452&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 Fri Jul  6 22:13:51 2012
@@ -2823,7 +2823,52 @@ public abstract class ProcessorDefinitio
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri) {
-        addOutput(new PollEnrichDefinition(null, resourceUri, -1));
+        addOutput(new PollEnrichDefinition(null, resourceUri, -1, false));
+        return (Type) this;
+    }
+
+    /**
+     * The <a href="http://camel.apache.org/content-enricher.html";>Content 
Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a 
<code>resourceUri</code>
+     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+     * <p/>
+     * The difference between this and {@link #enrich(String)} is that this 
uses a consumer
+     * to obtain the additional data, where as enrich uses a producer.
+     * <p/>
+     * This method will not wait for data to become available, use the method 
with an explicit timeout
+     * if you want to wait for data for a period of time from the resourceUri.
+     *
+     * @param resourceUri           URI of resource endpoint for obtaining 
additional data.
+     * @param pollMultiple           if enabled will poll for all Exchanges 
available on the endpoint
+     * @return the builder
+     * @see org.apache.camel.processor.PollEnricher
+     */
+    @SuppressWarnings("unchecked")
+    public Type pollEnrich(String resourceUri, boolean pollMultiple) {
+        addOutput(new PollEnrichDefinition(null, resourceUri, 0, 
pollMultiple));
+        return (Type) this;
+    }
+
+    /**
+     * The <a href="http://camel.apache.org/content-enricher.html";>Content 
Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a 
<code>resourceUri</code>
+     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
+     * <p/>
+     * The difference between this and {@link #enrich(String)} is that this 
uses a consumer
+     * to obtain the additional data, where as enrich uses a producer.
+     * <p/>
+     * This method will <tt>block</tt> until data is available, use the method 
with timeout if you do not
+     * want to risk waiting a long time before data is available from the 
resourceUri.
+     *
+     * @param resourceUri           URI of resource endpoint for obtaining 
additional data.
+     * @param timeout               timeout in millis to wait at most for data 
to be available.
+     * @param pollMultiple           if enabled will poll for all Exchanges 
available on the endpoint
+     * @return the builder
+     * @see org.apache.camel.processor.PollEnricher
+     */
+    @SuppressWarnings("unchecked")
+    public Type pollEnrich(String resourceUri, long timeout, boolean 
pollMultiple) {
+        addOutput(new PollEnrichDefinition(null, resourceUri, timeout, 
pollMultiple));
         return (Type) this;
     }
 
@@ -2845,7 +2890,7 @@ public abstract class ProcessorDefinitio
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri, AggregationStrategy 
aggregationStrategy) {
-        addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, 
-1));
+        addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, 
-1, false));
         return (Type) this;
     }
 
@@ -2869,7 +2914,7 @@ public abstract class ProcessorDefinitio
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri, long timeout, 
AggregationStrategy aggregationStrategy) {
-        addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, 
timeout));
+        addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, 
timeout, false));
         return (Type) this;
     }
 
@@ -2892,7 +2937,7 @@ public abstract class ProcessorDefinitio
      */
     @SuppressWarnings("unchecked")
     public Type pollEnrich(String resourceUri, long timeout) {
-        addOutput(new PollEnrichDefinition(null, resourceUri, timeout));
+        addOutput(new PollEnrichDefinition(null, resourceUri, timeout, false));
         return (Type) this;
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?rev=1358452&r1=1358451&r2=1358452&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
 Fri Jul  6 22:13:51 2012
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.processor;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.PollingConsumer;
@@ -47,6 +50,7 @@ public class PollEnricher extends Servic
     private AggregationStrategy aggregationStrategy;
     private PollingConsumer consumer;
     private long timeout;
+    private Boolean pollMultiple;
 
     /**
      * Creates a new {@link PollEnricher}. The default aggregation strategy is 
to
@@ -57,7 +61,7 @@ public class PollEnricher extends Servic
      * @param consumer consumer to resource endpoint.
      */
     public PollEnricher(PollingConsumer consumer) {
-        this(defaultAggregationStrategy(), consumer, 0);
+        this(defaultAggregationStrategy(), consumer, 0, false);
     }
 
     /**
@@ -74,6 +78,21 @@ public class PollEnricher extends Servic
     }
 
     /**
+     * Creates a new {@link PollEnricher}.
+     *
+     * @param aggregationStrategy  aggregation strategy to aggregate input 
data and additional data.
+     * @param consumer consumer to resource endpoint.
+     * @param timeout timeout in millis
+     * @param pollMultiple enabled building a List of multiple exchanges
+     */
+    public PollEnricher(AggregationStrategy aggregationStrategy, 
PollingConsumer consumer, long timeout, Boolean pollMultiple) {
+        this.aggregationStrategy = aggregationStrategy;
+        this.consumer = consumer;
+        this.timeout = timeout;
+        this.pollMultiple = pollMultiple;
+    }
+
+    /**
      * Sets the aggregation strategy for this poll enricher.
      *
      * @param aggregationStrategy the aggregationStrategy to set
@@ -100,6 +119,10 @@ public class PollEnricher extends Servic
         this.timeout = timeout;
     }
 
+    public void setPollMultiple(Boolean value) {
+        this.pollMultiple = value;
+    }
+    
     /**
      * Enriches the input data (<code>exchange</code>) by first obtaining
      * additional data from an endpoint represented by an endpoint
@@ -113,47 +136,70 @@ public class PollEnricher extends Servic
      * @param exchange input data.
      */
     public void process(Exchange exchange) throws Exception {
+
         preCheckPoll(exchange);
 
-        Exchange resourceExchange;
-        if (timeout < 0) {
-            LOG.debug("Consumer receive: {}", consumer);
-            resourceExchange = consumer.receive();
-        } else if (timeout == 0) {
-            LOG.debug("Consumer receiveNoWait: {}", consumer);
-            resourceExchange = consumer.receiveNoWait();
-        } else {
-            LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, 
consumer);
-            resourceExchange = consumer.receive(timeout);
-        }
+        if (pollMultiple != null && pollMultiple) {
 
-        if (resourceExchange == null) {
-            LOG.debug("Consumer received no exchange");
-        } else {
-            LOG.debug("Consumer received: {}", resourceExchange);
-        }
+            List<Exchange> exchangeList = new ArrayList<Exchange>();
+            Exchange receivedExchange;
+            while (true) {
+                if (timeout == 0) {
+                    LOG.debug("Polling Consumer receiveNoWait: {}", consumer);
+                    receivedExchange = consumer.receiveNoWait();
+                } else {
+                    LOG.debug("Polling Consumer receive with timeout: {} ms. 
{}", timeout, consumer);
+                    receivedExchange = consumer.receive(timeout);
+                }
 
-        if (resourceExchange != null && resourceExchange.isFailed()) {
-            // copy resource exchange onto original exchange (preserving 
pattern)
-            copyResultsPreservePattern(exchange, resourceExchange);
+                if (receivedExchange == null) {
+                    break;
+                }
+                exchangeList.add(receivedExchange);
+            }
+            exchange.getIn().setBody(exchangeList);
         } else {
-            prepareResult(exchange);
-
-            // prepare the exchanges for aggregation
-            ExchangeHelper.prepareAggregation(exchange, resourceExchange);
-            // must catch any exception from aggregation
-            Exchange aggregatedExchange;
-            try {
-                aggregatedExchange = aggregationStrategy.aggregate(exchange, 
resourceExchange);
-            } catch (Throwable e) {
-                throw new CamelExchangeException("Error occurred during 
aggregation", exchange, e);
+        
+            Exchange resourceExchange;
+            if (timeout < 0) {
+                LOG.debug("Consumer receive: {}", consumer);
+                resourceExchange = consumer.receive();
+            } else if (timeout == 0) {
+                LOG.debug("Consumer receiveNoWait: {}", consumer);
+                resourceExchange = consumer.receiveNoWait();
+            } else {
+                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, 
consumer);
+                resourceExchange = consumer.receive(timeout);
+            }
+    
+            if (resourceExchange == null) {
+                LOG.debug("Consumer received no exchange");
+            } else {
+                LOG.debug("Consumer received: {}", resourceExchange);
             }
-            if (aggregatedExchange != null) {
-                // copy aggregation result onto original exchange (preserving 
pattern)
-                copyResultsPreservePattern(exchange, aggregatedExchange);
-                // handover any synchronization
-                if (resourceExchange != null) {
-                    resourceExchange.handoverCompletions(exchange);
+
+            if (resourceExchange != null && resourceExchange.isFailed()) {
+                // copy resource exchange onto original exchange (preserving 
pattern)
+                copyResultsPreservePattern(exchange, resourceExchange);
+            } else {
+                prepareResult(exchange);
+    
+                // prepare the exchanges for aggregation
+                ExchangeHelper.prepareAggregation(exchange, resourceExchange);
+                // must catch any exception from aggregation
+                Exchange aggregatedExchange;
+                try {
+                    aggregatedExchange = 
aggregationStrategy.aggregate(exchange, resourceExchange);
+                } catch (Throwable e) {
+                    throw new CamelExchangeException("Error occurred during 
aggregation", exchange, e);
+                }
+                if (aggregatedExchange != null) {
+                    // copy aggregation result onto original exchange 
(preserving pattern)
+                    copyResultsPreservePattern(exchange, aggregatedExchange);
+                    // handover any synchronization
+                    if (resourceExchange != null) {
+                        resourceExchange.handoverCompletions(exchange);
+                    }
                 }
             }
         }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java?rev=1358452&r1=1358451&r2=1358452&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java
 Fri Jul  6 22:13:51 2012
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.processor.enricher;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -116,6 +119,41 @@ public class PollEnricherTest extends Co
         assertNull(exchange.getException());
     }
 
+    public void testPollEnrichMultipleDefaultNoWait() throws 
InterruptedException {
+
+        mock.expectedMessageCount(1);
+        template.sendBody("seda:foo5", "msg1");
+        template.sendBody("seda:foo5", "msg2");
+        template.sendBody("seda:enricher-test-5", "test");
+        Thread.sleep(100);
+        template.sendBody("seda:foo5", "msg3");
+        template.sendBody("seda:foo5", "msg4");
+
+        List<Exchange> polledExchanges = 
mock.getExchanges().get(0).getIn().getBody(List.class);
+        assertEquals(2, polledExchanges.size());
+
+        mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo5");
+        mock.assertIsSatisfied(0);
+    }
+
+    public void testPollEnrichMultipleExplicitTimeout() throws 
InterruptedException {
+
+        mock.expectedMessageCount(1);
+        template.sendBody("seda:foo6", "msg1");
+        template.sendBody("seda:foo6", "msg2");
+        template.sendBody("seda:enricher-test-6", "test");
+        template.sendBody("seda:foo6", "msg3");
+        template.sendBody("seda:foo6", "msg4");
+
+        Thread.sleep(500);
+
+        List<Exchange> polledExchanges = 
mock.getExchanges().get(0).getIn().getBody(List.class);
+        assertEquals(4, polledExchanges.size());
+
+        mock.expectedHeaderReceived(Exchange.TO_ENDPOINT, "seda://foo6");
+        mock.assertIsSatisfied(0);
+    }
+
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
@@ -141,6 +179,18 @@ public class PollEnricherTest extends Co
 
                 from("direct:enricher-test-4")
                     .pollEnrich("seda:foo4", aggregationStrategy);
+
+                // 
-------------------------------------------------------------
+                //  Poll Multiple routes
+                // 
-------------------------------------------------------------
+
+                from("seda:enricher-test-5")
+                    .pollEnrich("seda:foo5", true)
+                    .to("mock:mock");
+
+                from("seda:enricher-test-6")
+                    .pollEnrich("seda:foo6", 200, true)
+                    .to("mock:mock");
             }
         };
     }

Modified: 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml?rev=1358452&r1=1358451&r2=1358452&view=diff
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
 (original)
+++ 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
 Fri Jul  6 22:13:51 2012
@@ -48,6 +48,19 @@
             <pollEnrich uri="seda:foo4" strategyRef="sampleAggregator"/>
             <to uri="mock:mock"/>
         </route>
+
+        <route>
+            <from uri="seda:enricher-test-5"/>
+            <pollEnrich uri="seda:foo5" pollMultiple="true"/>
+            <to uri="mock:mock"/>
+        </route>
+
+        <route>
+            <from uri="seda:enricher-test-6"/>
+            <pollEnrich uri="seda:foo6" timeout="200" pollMultiple="true"/>
+            <to uri="mock:mock"/>
+        </route>
+
     </camelContext>
 
     <bean id="sampleAggregator" 
class="org.apache.camel.processor.enricher.SampleAggregator"/>

Modified: 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml?rev=1358452&r1=1358451&r2=1358452&view=diff
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
 (original)
+++ 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml
 Fri Jul  6 22:13:51 2012
@@ -28,6 +28,8 @@
         <endpoint id="foo2" uri="seda:foo2"/>
         <endpoint id="foo3" uri="seda:foo3"/>
         <endpoint id="foo4" uri="seda:foo4"/>
+        <endpoint id="foo5" uri="seda:foo5"/>
+        <endpoint id="foo6" uri="seda:foo6"/>
 
         <!-- START SNIPPET: e1 -->
         <route>
@@ -54,6 +56,19 @@
             <pollEnrich ref="foo4" strategyRef="sampleAggregator"/>
             <to uri="mock:mock"/>
         </route>
+
+        <route>
+            <from uri="seda:enricher-test-5"/>
+            <pollEnrich ref="foo5" pollMultiple="true"/>
+            <to uri="mock:mock"/>
+        </route>
+
+        <route>
+            <from uri="seda:enricher-test-6"/>
+            <pollEnrich ref="foo6" timeout="200" pollMultiple="true"/>
+            <to uri="mock:mock"/>
+        </route>
+
     </camelContext>
 
     <bean id="sampleAggregator" 
class="org.apache.camel.processor.enricher.SampleAggregator"/>


Reply via email to