Author: ningjiang
Date: Wed Jul 28 06:52:00 2010
New Revision: 979959

URL: http://svn.apache.org/viewvc?rev=979959&view=rev
Log:
CAMEL-3002 Add option steaming to recipient list EIP

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java
   (with props)
    
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java
   (with props)
    
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml
   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java?rev=979959&r1=979958&r2=979959&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/RecipientList.java 
Wed Jul 28 06:52:00 2010
@@ -47,6 +47,7 @@ public @interface RecipientList {
     String delimiter() default ",";
     boolean parallelProcessing() default false;
     boolean stopOnException() default false;
+    boolean streaming() default false;
     boolean ignoreInvalidEndpoints() default false;
     String strategyRef() default "";
     String executorServiceRef() default "";

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java?rev=979959&r1=979958&r2=979959&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
 Wed Jul 28 06:52:00 2010
@@ -104,6 +104,7 @@ public class MethodInfo {
             recipientList.setStopOnException(annotation.stopOnException());
             
recipientList.setIgnoreInvalidEndpoints(annotation.ignoreInvalidEndpoints());
             
recipientList.setParallelProcessing(annotation.parallelProcessing());
+            recipientList.setStreaming(annotation.streaming());
 
             if (ObjectHelper.isNotEmpty(annotation.executorServiceRef())) {
                 ExecutorService executor = 
CamelContextHelper.mandatoryLookup(camelContext, 
annotation.executorServiceRef(), ExecutorService.class);

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java?rev=979959&r1=979958&r2=979959&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultShutdownStrategy.java
 Wed Jul 28 06:52:00 2010
@@ -398,8 +398,8 @@ public class DefaultShutdownStrategy ext
                 }
                 if (size > 0) {
                     try {
-                        LOG.info("Waiting as there are still " + size + " 
inflight and pending exchanges to complete, timeout in " +
-                                (TimeUnit.SECONDS.convert(getTimeout(), 
getTimeUnit()) - (loopCount++ * loopDelaySeconds)) + " seconds.");
+                        LOG.info("Waiting as there are still " + size + " 
inflight and pending exchanges to complete, timeout in "
+                                 + (TimeUnit.SECONDS.convert(getTimeout(), 
getTimeUnit()) - (loopCount++ * loopDelaySeconds)) + " seconds.");
                         Thread.sleep(loopDelaySeconds * 1000);
                     } catch (InterruptedException e) {
                         LOG.warn("Interrupted while waiting during graceful 
shutdown, will force shutdown now.");

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=979959&r1=979958&r2=979959&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
 Wed Jul 28 06:52:00 2010
@@ -57,6 +57,8 @@ public class RecipientListDefinition<Typ
     private Boolean stopOnException;
     @XmlAttribute(required = false)
     private Boolean ignoreInvalidEndpoints;
+    @XmlAttribute(required = false)
+    private Boolean streaming;
 
     public RecipientListDefinition() {
     }
@@ -91,6 +93,7 @@ public class RecipientListDefinition<Typ
         }
         answer.setAggregationStrategy(createAggregationStrategy(routeContext));
         answer.setParallelProcessing(isParallelProcessing());
+        answer.setStreaming(isStreaming());   
         if (stopOnException != null) {
             answer.setStopOnException(isStopOnException());
         }
@@ -170,6 +173,16 @@ public class RecipientListDefinition<Typ
         setParallelProcessing(true);
         return this;
     }
+    
+    /**
+     * Doing the recipient list work in streaming model
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> streaming() {
+        setStreaming(true);
+        return this;
+    }
 
     /**
      * Will now stop further processing if an exception occurred during 
processing of an
@@ -260,4 +273,12 @@ public class RecipientListDefinition<Typ
     public void setExecutorService(ExecutorService executorService) {
         this.executorService = executorService;
     }
+
+    public void setStreaming(boolean streaming) {
+        this.streaming = streaming;
+    }
+
+    public boolean isStreaming() {
+        return streaming != null ? streaming : false;
+    }
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=979959&r1=979958&r2=979959&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
 Wed Jul 28 06:52:00 2010
@@ -52,6 +52,7 @@ public class RecipientList extends Servi
     private boolean parallelProcessing;
     private boolean stopOnException;
     private boolean ignoreInvalidEndpoints;
+    private boolean streaming;
     private ExecutorService executorService;
     private AggregationStrategy aggregationStrategy = new 
UseLatestAggregationStrategy();
 
@@ -106,7 +107,7 @@ public class RecipientList extends Servi
         Iterator<Object> iter = ObjectHelper.createIterator(recipientList, 
delimiter);
 
         RecipientListProcessor rlp = new 
RecipientListProcessor(exchange.getContext(), producerCache, iter, 
getAggregationStrategy(),
-                                                                
isParallelProcessing(), getExecutorService(), false, isStopOnException());
+                                                                
isParallelProcessing(), getExecutorService(), isStreaming(), 
isStopOnException());
         rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
 
         // now let the multicast process the exchange
@@ -133,7 +134,15 @@ public class RecipientList extends Servi
     protected void doStop() throws Exception {
         ServiceHelper.stopService(producerCache);
     }
-
+    
+    public boolean isStreaming() {
+        return streaming;
+    }
+    
+    public void setStreaming(boolean streaming) {
+        this.streaming = streaming;
+    }
+ 
     public boolean isIgnoreInvalidEndpoints() {
         return ignoreInvalidEndpoints;
     }

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java?rev=979959&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java
 Wed Jul 28 06:52:00 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class RecipientListParallelStreamingTest extends ContextTestSupport {
+
+    public void testRecipientListParallel() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("c");
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "foo", 
"direct:a,direct:b,direct:c");
+
+        assertMockEndpointsSatisfied();
+        
+        mock.reset();
+        mock.expectedBodiesReceived("b");
+
+        template.sendBodyAndHeader("direct:streaming", "Hello World", "foo", 
"direct:a,direct:b,direct:c");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    
.recipientList(header("foo")).parallelProcessing().to("mock:result");
+                
+                from("direct:streaming")
+                    
.recipientList(header("foo")).parallelProcessing().streaming().to("mock:result");
+
+                from("direct:a").delay(100).transform(constant("a"));
+                from("direct:b").delay(500).transform(constant("b"));
+                from("direct:c").transform(constant("c"));
+            }
+        };
+    }
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java?rev=979959&view=auto
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java
 (added)
+++ 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java
 Wed Jul 28 06:52:00 2010
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.RecipientListParallelStreamingTest;
+
+import static 
org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class SpringRecipientListParallelStreamingTest extends 
RecipientListParallelStreamingTest {
+
+    protected CamelContext createCamelContext() throws Exception {
+        return createSpringCamelContext(this,
+                
"org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml");
+    }
+}
\ No newline at end of file

Propchange: 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml?rev=979959&view=auto
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml
 (added)
+++ 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml
 Wed Jul 28 06:52:00 2010
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd
+    ">
+
+
+    <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring";>
+        <route>
+            <from uri="direct:start"/>
+            <recipientList parallelProcessing="true">
+                <header>foo</header>
+            </recipientList>
+            <to uri="mock:result"/>
+        </route>
+        <route>
+            <from uri="direct:streaming"/>
+            <recipientList parallelProcessing="true" streaming="true">
+                <header>foo</header>
+            </recipientList>
+            <to uri="mock:result"/>
+        </route>
+
+        <route>
+            <from uri="direct:a"/>
+            <delay><constant>100</constant></delay>
+            <transform><constant>a</constant></transform>
+        </route>
+
+        <route>
+            <from uri="direct:b"/>
+            <delay><constant>500</constant></delay>
+            <transform><constant>b</constant></transform>
+        </route>
+
+        <route>
+            <from uri="direct:c"/>
+            <transform><constant>c</constant></transform>
+        </route>
+
+    </camelContext>
+
+</beans>

Propchange: 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringRecipientListParallelStreamingTest.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml


Reply via email to