Author: davsclaus Date: Tue Nov 10 10:40:46 2009 New Revision: 834417 URL: http://svn.apache.org/viewvc?rev=834417&view=rev Log: CAMEL-2151: Multiple toAsync in same route.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java - copied, changed from r834396, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java - copied, changed from r834396, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml - copied, changed from r834396, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?rev=834417&r1=834416&r2=834417&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java Tue Nov 10 10:40:46 2009 @@ -101,7 +101,7 @@ * Strategy to do post configuration logic. * <p/> * Can be used to construct an URI based on the remaining parameters. For example the parameters that configures - * the endpoint have been removed from the parameters which which leaves it with only additional parameters. + * the endpoint have been removed from the parameters which leaves only the additional parameters left. * * @param endpoint the created endpoint * @param parameters the remaining parameters after the endpoint has been created and parsed the parameters Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=834417&r1=834416&r2=834417&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Tue Nov 10 10:40:46 2009 @@ -28,7 +28,9 @@ import org.apache.camel.Endpoint; import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; +import org.apache.camel.processor.Pipeline; import org.apache.camel.processor.SendAsyncProcessor; +import org.apache.camel.processor.UnitOfWorkProcessor; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.concurrent.ExecutorServiceHelper; @@ -90,16 +92,19 @@ executorService = routeContext.lookup(executorServiceRef, ExecutorService.class); } if (executorService == null && poolSize != null) { - executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "ToAsync", true); + executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "ToAsync[" + getLabel() + "]", true); } // create the child processor which is the async route Processor childProcessor = routeContext.createProcessor(this); + // wrap it in a unit of work so the route that comes next is also done in a unit of work + UnitOfWorkProcessor uow = new UnitOfWorkProcessor(childProcessor); + // create async processor Endpoint endpoint = resolveEndpoint(routeContext); - SendAsyncProcessor async = new SendAsyncProcessor(endpoint, getPattern(), childProcessor); + SendAsyncProcessor async = new SendAsyncProcessor(endpoint, getPattern(), uow); if (executorService != null) { async.setExecutorService(executorService); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=834417&r1=834416&r2=834417&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Tue Nov 10 10:40:46 2009 @@ -16,6 +16,8 @@ */ package org.apache.camel.processor; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -26,6 +28,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.Navigate; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.ProducerCallback; @@ -37,7 +40,7 @@ /** * @version $Revision$ */ -public class SendAsyncProcessor extends SendProcessor implements Runnable { +public class SendAsyncProcessor extends SendProcessor implements Runnable, Navigate { private static final int DEFAULT_THREADPOOL_SIZE = 10; protected final Processor target; @@ -141,6 +144,19 @@ this.exceptionHandler = exceptionHandler; } + public boolean hasNext() { + return target != null; + } + + public List<Processor> next() { + if (!hasNext()) { + return null; + } + List<Processor> answer = new ArrayList<Processor>(1); + answer.add(target); + return answer; + } + public void run() { while (isRunAllowed()) { Exchange exchange; @@ -154,6 +170,13 @@ if (exchange != null) { try { + // copy OUT to IN + if (exchange.hasOut()) { + // replace OUT with IN as async processing changed something + exchange.setIn(exchange.getOut()); + exchange.setOut(null); + } + if (LOG.isDebugEnabled()) { LOG.debug("Async reply received now routing the Exchange: " + exchange); } Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java (from r834396, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java&r1=834396&r2=834417&rev=834417&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java Tue Nov 10 10:40:46 2009 @@ -22,13 +22,15 @@ /** * @version $Revision$ */ -public class ToAsyncTest extends ContextTestSupport { +public class ToAsyncTwoTest extends ContextTestSupport { - public void testToAsync() throws Exception { + public void testToAsyncTwo() throws Exception { getMockEndpoint("mock:a").expectedBodiesReceived("Hello World"); getMockEndpoint("mock:b").expectedBodiesReceived("Hello World"); - getMockEndpoint("mock:result").expectedMessageCount(1); - getMockEndpoint("mock:result").message(0).outBody(String.class).isEqualTo("Bye World"); + getMockEndpoint("mock:c").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:d").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:e").expectedBodiesReceived("Bye Again World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Hi World"); template.sendBody("direct:start", "Hello World"); @@ -38,12 +40,17 @@ String ida = getMockEndpoint("mock:a").getReceivedExchanges().get(0).getExchangeId(); String idb = getMockEndpoint("mock:b").getReceivedExchanges().get(0).getExchangeId(); + String idc = getMockEndpoint("mock:c").getReceivedExchanges().get(0).getExchangeId(); + String idd = getMockEndpoint("mock:d").getReceivedExchanges().get(0).getExchangeId(); + String ide = getMockEndpoint("mock:e").getReceivedExchanges().get(0).getExchangeId(); String idresult = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getExchangeId(); - // id a should be different and id b and id result the same - assertNotSame(ida, idb); - assertNotSame(ida, idresult); - assertSame(idb, idresult); + // ids on exchanges should be different in groups + assertSame(idb, idc); + assertSame(idd, idresult); + assertSame(idd, ide); + assertNotNull(ida, idb); + assertNotNull(idb, idd); } @Override @@ -51,10 +58,14 @@ return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").to("mock:a").toAsync("direct:bar", 5).to("mock:result"); + from("direct:start").to("mock:a") + .toAsync("direct:bar", 5).to("mock:c") + .toAsync("direct:foo", 2).to("mock:e").transform(constant("Hi World")).to("mock:result"); from("direct:bar").to("mock:b").transform(constant("Bye World")); + + from("direct:foo").to("mock:d").transform(constant("Bye Again World")); } }; } -} +} \ No newline at end of file Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java?rev=834417&r1=834416&r2=834417&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java (original) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java Tue Nov 10 10:40:46 2009 @@ -26,11 +26,6 @@ */ public class SpringToAsyncTest extends ToAsyncTest { - @Override - public boolean isUseRouteBuilder() { - return false; - } - protected CamelContext createCamelContext() throws Exception { return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringToAsyncTest.xml"); } Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java (from r834396, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java&r1=834396&r2=834417&rev=834417&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java (original) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java Tue Nov 10 10:40:46 2009 @@ -17,22 +17,17 @@ package org.apache.camel.spring.processor; import org.apache.camel.CamelContext; -import org.apache.camel.processor.async.ToAsyncTest; +import org.apache.camel.processor.async.ToAsyncTwoTest; import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; /** * @version $Revision$ */ -public class SpringToAsyncTest extends ToAsyncTest { - - @Override - public boolean isUseRouteBuilder() { - return false; - } +public class SpringToAsyncTwoTest extends ToAsyncTwoTest { protected CamelContext createCamelContext() throws Exception { - return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringToAsyncTest.xml"); + return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml"); } -} +} \ No newline at end of file Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml (from r834396, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml&r1=834396&r2=834417&rev=834417&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml Tue Nov 10 10:40:46 2009 @@ -27,6 +27,10 @@ <from uri="direct:start"/> <to uri="mock:a"/> <to uri="direct:bar" async="true" poolSize="5"/> + <to uri="mock:c"/> + <to uri="direct:foo" async="true" poolSize="2"/> + <to uri="mock:e"/> + <transform><constant>Hi World</constant></transform> <to uri="mock:result"/> </route> @@ -35,6 +39,13 @@ <to uri="mock:b"/> <transform><constant>Bye World</constant></transform> </route> + + <route> + <from uri="direct:foo"/> + <to uri="mock:d"/> + <transform><constant>Bye Again World</constant></transform> + </route> + </camelContext> </beans>