Author: davsclaus
Date: Tue Nov 10 10:40:46 2009
New Revision: 834417

URL: http://svn.apache.org/viewvc?rev=834417&view=rev
Log:
CAMEL-2151: Multiple toAsync in same route.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java
      - copied, changed from r834396, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
    
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java
      - copied, changed from r834396, 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
    
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml
      - copied, changed from r834396, 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
    
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?rev=834417&r1=834416&r2=834417&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
 Tue Nov 10 10:40:46 2009
@@ -101,7 +101,7 @@
      * Strategy to do post configuration logic.
      * <p/>
      * Can be used to construct an URI based on the remaining parameters. For 
example the parameters that configures
-     * the endpoint have been removed from the parameters which which leaves 
it with only additional parameters.
+     * the endpoint have been removed from the parameters which leaves only 
the additional parameters left.
      *
      * @param endpoint the created endpoint
      * @param parameters the remaining parameters after the endpoint has been 
created and parsed the parameters

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=834417&r1=834416&r2=834417&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java 
Tue Nov 10 10:40:46 2009
@@ -28,7 +28,9 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
+import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.SendAsyncProcessor;
+import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 
@@ -90,16 +92,19 @@
             executorService = routeContext.lookup(executorServiceRef, 
ExecutorService.class);
         }
         if (executorService == null && poolSize != null) {
-            executorService = 
ExecutorServiceHelper.newScheduledThreadPool(poolSize, "ToAsync", true);
+            executorService = 
ExecutorServiceHelper.newScheduledThreadPool(poolSize, "ToAsync[" + getLabel() 
+ "]", true);
         }
 
         // create the child processor which is the async route
         Processor childProcessor = routeContext.createProcessor(this);
 
+        // wrap it in a unit of work so the route that comes next is also done 
in a unit of work
+        UnitOfWorkProcessor uow = new UnitOfWorkProcessor(childProcessor);
+
         // create async processor
         Endpoint endpoint = resolveEndpoint(routeContext);
 
-        SendAsyncProcessor async = new SendAsyncProcessor(endpoint, 
getPattern(), childProcessor);
+        SendAsyncProcessor async = new SendAsyncProcessor(endpoint, 
getPattern(), uow);
         if (executorService != null) {
             async.setExecutorService(executorService);
         }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=834417&r1=834416&r2=834417&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
 Tue Nov 10 10:40:46 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -26,6 +28,7 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.ProducerCallback;
@@ -37,7 +40,7 @@
 /**
  * @version $Revision$
  */
-public class SendAsyncProcessor extends SendProcessor implements Runnable {
+public class SendAsyncProcessor extends SendProcessor implements Runnable, 
Navigate {
 
     private static final int DEFAULT_THREADPOOL_SIZE = 10;
     protected final Processor target;
@@ -141,6 +144,19 @@
         this.exceptionHandler = exceptionHandler;
     }
 
+    public boolean hasNext() {
+        return target != null;
+    }
+
+    public List<Processor> next() {
+        if (!hasNext()) {
+            return null;
+        }
+        List<Processor> answer = new ArrayList<Processor>(1);
+        answer.add(target);
+        return answer;
+    }
+
     public void run() {
         while (isRunAllowed()) {
             Exchange exchange;
@@ -154,6 +170,13 @@
 
             if (exchange != null) {
                 try {
+                    // copy OUT to IN
+                    if (exchange.hasOut()) {
+                        // replace OUT with IN as async processing changed 
something
+                        exchange.setIn(exchange.getOut());
+                        exchange.setOut(null);
+                    }
+
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Async reply received now routing the 
Exchange: " + exchange);
                     }

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java
 (from r834396, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java&r1=834396&r2=834417&rev=834417&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java
 Tue Nov 10 10:40:46 2009
@@ -22,13 +22,15 @@
 /**
  * @version $Revision$
  */
-public class ToAsyncTest extends ContextTestSupport {
+public class ToAsyncTwoTest extends ContextTestSupport {
 
-    public void testToAsync() throws Exception {
+    public void testToAsyncTwo() throws Exception {
         getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
         getMockEndpoint("mock:b").expectedBodiesReceived("Hello World");
-        getMockEndpoint("mock:result").expectedMessageCount(1);
-        
getMockEndpoint("mock:result").message(0).outBody(String.class).isEqualTo("Bye 
World");
+        getMockEndpoint("mock:c").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:d").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:e").expectedBodiesReceived("Bye Again World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hi World");
 
         template.sendBody("direct:start", "Hello World");
 
@@ -38,12 +40,17 @@
 
         String ida = 
getMockEndpoint("mock:a").getReceivedExchanges().get(0).getExchangeId();
         String idb = 
getMockEndpoint("mock:b").getReceivedExchanges().get(0).getExchangeId();
+        String idc = 
getMockEndpoint("mock:c").getReceivedExchanges().get(0).getExchangeId();
+        String idd = 
getMockEndpoint("mock:d").getReceivedExchanges().get(0).getExchangeId();
+        String ide = 
getMockEndpoint("mock:e").getReceivedExchanges().get(0).getExchangeId();
         String idresult = 
getMockEndpoint("mock:result").getReceivedExchanges().get(0).getExchangeId();
 
-        // id a should be different and id b and id result the same
-        assertNotSame(ida, idb);
-        assertNotSame(ida, idresult);
-        assertSame(idb, idresult);
+        // ids on exchanges should be different in groups
+        assertSame(idb, idc);
+        assertSame(idd, idresult);
+        assertSame(idd, ide);
+        assertNotNull(ida, idb);
+        assertNotNull(idb, idd);
     }
 
     @Override
@@ -51,10 +58,14 @@
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start").to("mock:a").toAsync("direct:bar", 
5).to("mock:result");
+                from("direct:start").to("mock:a")
+                    .toAsync("direct:bar", 5).to("mock:c")
+                    .toAsync("direct:foo", 
2).to("mock:e").transform(constant("Hi World")).to("mock:result");
 
                 from("direct:bar").to("mock:b").transform(constant("Bye 
World"));
+
+                from("direct:foo").to("mock:d").transform(constant("Bye Again 
World"));
             }
         };
     }
-}
+}
\ No newline at end of file

Modified: 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java?rev=834417&r1=834416&r2=834417&view=diff
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
 (original)
+++ 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
 Tue Nov 10 10:40:46 2009
@@ -26,11 +26,6 @@
  */
 public class SpringToAsyncTest extends ToAsyncTest {
 
-    @Override
-    public boolean isUseRouteBuilder() {
-        return false;
-    }
-
     protected CamelContext createCamelContext() throws Exception {
         return createSpringCamelContext(this, 
"org/apache/camel/spring/processor/SpringToAsyncTest.xml");
     }

Copied: 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java
 (from r834396, 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java&r1=834396&r2=834417&rev=834417&view=diff
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
 (original)
+++ 
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java
 Tue Nov 10 10:40:46 2009
@@ -17,22 +17,17 @@
 package org.apache.camel.spring.processor;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.processor.async.ToAsyncTest;
+import org.apache.camel.processor.async.ToAsyncTwoTest;
 
 import static 
org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
 /**
  * @version $Revision$
  */
-public class SpringToAsyncTest extends ToAsyncTest {
-
-    @Override
-    public boolean isUseRouteBuilder() {
-        return false;
-    }
+public class SpringToAsyncTwoTest extends ToAsyncTwoTest {
 
     protected CamelContext createCamelContext() throws Exception {
-        return createSpringCamelContext(this, 
"org/apache/camel/spring/processor/SpringToAsyncTest.xml");
+        return createSpringCamelContext(this, 
"org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml");
     }
 
-}
+}
\ No newline at end of file

Copied: 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml
 (from r834396, 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml&r1=834396&r2=834417&rev=834417&view=diff
==============================================================================
--- 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml
 (original)
+++ 
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml
 Tue Nov 10 10:40:46 2009
@@ -27,6 +27,10 @@
             <from uri="direct:start"/>
             <to uri="mock:a"/>
             <to uri="direct:bar" async="true" poolSize="5"/>
+            <to uri="mock:c"/>
+            <to uri="direct:foo" async="true" poolSize="2"/>
+            <to uri="mock:e"/>
+            <transform><constant>Hi World</constant></transform>
             <to uri="mock:result"/>
         </route>
 
@@ -35,6 +39,13 @@
             <to uri="mock:b"/>
             <transform><constant>Bye World</constant></transform>
         </route>
+
+        <route>
+            <from uri="direct:foo"/>
+            <to uri="mock:d"/>
+            <transform><constant>Bye Again World</constant></transform>
+        </route>
+
     </camelContext>
 
 </beans>


Reply via email to