Author: davsclaus
Date: Tue Nov 10 10:40:46 2009
New Revision: 834417
URL: http://svn.apache.org/viewvc?rev=834417&view=rev
Log:
CAMEL-2151: Multiple toAsync in same route.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java
- copied, changed from r834396,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java
- copied, changed from r834396,
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml
- copied, changed from r834396,
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?rev=834417&r1=834416&r2=834417&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
Tue Nov 10 10:40:46 2009
@@ -101,7 +101,7 @@
* Strategy to do post configuration logic.
* <p/>
* Can be used to construct an URI based on the remaining parameters. For
example the parameters that configures
- * the endpoint have been removed from the parameters which which leaves
it with only additional parameters.
+ * the endpoint have been removed from the parameters which leaves only
the additional parameters left.
*
* @param endpoint the created endpoint
* @param parameters the remaining parameters after the endpoint has been
created and parsed the parameters
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=834417&r1=834416&r2=834417&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
Tue Nov 10 10:40:46 2009
@@ -28,7 +28,9 @@
import org.apache.camel.Endpoint;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
+import org.apache.camel.processor.Pipeline;
import org.apache.camel.processor.SendAsyncProcessor;
+import org.apache.camel.processor.UnitOfWorkProcessor;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
@@ -90,16 +92,19 @@
executorService = routeContext.lookup(executorServiceRef,
ExecutorService.class);
}
if (executorService == null && poolSize != null) {
- executorService =
ExecutorServiceHelper.newScheduledThreadPool(poolSize, "ToAsync", true);
+ executorService =
ExecutorServiceHelper.newScheduledThreadPool(poolSize, "ToAsync[" + getLabel()
+ "]", true);
}
// create the child processor which is the async route
Processor childProcessor = routeContext.createProcessor(this);
+ // wrap it in a unit of work so the route that comes next is also done
in a unit of work
+ UnitOfWorkProcessor uow = new UnitOfWorkProcessor(childProcessor);
+
// create async processor
Endpoint endpoint = resolveEndpoint(routeContext);
- SendAsyncProcessor async = new SendAsyncProcessor(endpoint,
getPattern(), childProcessor);
+ SendAsyncProcessor async = new SendAsyncProcessor(endpoint,
getPattern(), uow);
if (executorService != null) {
async.setExecutorService(executorService);
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=834417&r1=834416&r2=834417&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
Tue Nov 10 10:40:46 2009
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -26,6 +28,7 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ProducerCallback;
@@ -37,7 +40,7 @@
/**
* @version $Revision$
*/
-public class SendAsyncProcessor extends SendProcessor implements Runnable {
+public class SendAsyncProcessor extends SendProcessor implements Runnable,
Navigate {
private static final int DEFAULT_THREADPOOL_SIZE = 10;
protected final Processor target;
@@ -141,6 +144,19 @@
this.exceptionHandler = exceptionHandler;
}
+ public boolean hasNext() {
+ return target != null;
+ }
+
+ public List<Processor> next() {
+ if (!hasNext()) {
+ return null;
+ }
+ List<Processor> answer = new ArrayList<Processor>(1);
+ answer.add(target);
+ return answer;
+ }
+
public void run() {
while (isRunAllowed()) {
Exchange exchange;
@@ -154,6 +170,13 @@
if (exchange != null) {
try {
+ // copy OUT to IN
+ if (exchange.hasOut()) {
+ // replace OUT with IN as async processing changed
something
+ exchange.setIn(exchange.getOut());
+ exchange.setOut(null);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("Async reply received now routing the
Exchange: " + exchange);
}
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java
(from r834396,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java&r1=834396&r2=834417&rev=834417&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTwoTest.java
Tue Nov 10 10:40:46 2009
@@ -22,13 +22,15 @@
/**
* @version $Revision$
*/
-public class ToAsyncTest extends ContextTestSupport {
+public class ToAsyncTwoTest extends ContextTestSupport {
- public void testToAsync() throws Exception {
+ public void testToAsyncTwo() throws Exception {
getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
getMockEndpoint("mock:b").expectedBodiesReceived("Hello World");
- getMockEndpoint("mock:result").expectedMessageCount(1);
-
getMockEndpoint("mock:result").message(0).outBody(String.class).isEqualTo("Bye
World");
+ getMockEndpoint("mock:c").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:d").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:e").expectedBodiesReceived("Bye Again World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hi World");
template.sendBody("direct:start", "Hello World");
@@ -38,12 +40,17 @@
String ida =
getMockEndpoint("mock:a").getReceivedExchanges().get(0).getExchangeId();
String idb =
getMockEndpoint("mock:b").getReceivedExchanges().get(0).getExchangeId();
+ String idc =
getMockEndpoint("mock:c").getReceivedExchanges().get(0).getExchangeId();
+ String idd =
getMockEndpoint("mock:d").getReceivedExchanges().get(0).getExchangeId();
+ String ide =
getMockEndpoint("mock:e").getReceivedExchanges().get(0).getExchangeId();
String idresult =
getMockEndpoint("mock:result").getReceivedExchanges().get(0).getExchangeId();
- // id a should be different and id b and id result the same
- assertNotSame(ida, idb);
- assertNotSame(ida, idresult);
- assertSame(idb, idresult);
+ // ids on exchanges should be different in groups
+ assertSame(idb, idc);
+ assertSame(idd, idresult);
+ assertSame(idd, ide);
+ assertNotNull(ida, idb);
+ assertNotNull(idb, idd);
}
@Override
@@ -51,10 +58,14 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start").to("mock:a").toAsync("direct:bar",
5).to("mock:result");
+ from("direct:start").to("mock:a")
+ .toAsync("direct:bar", 5).to("mock:c")
+ .toAsync("direct:foo",
2).to("mock:e").transform(constant("Hi World")).to("mock:result");
from("direct:bar").to("mock:b").transform(constant("Bye
World"));
+
+ from("direct:foo").to("mock:d").transform(constant("Bye Again
World"));
}
};
}
-}
+}
\ No newline at end of file
Modified:
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java?rev=834417&r1=834416&r2=834417&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
(original)
+++
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
Tue Nov 10 10:40:46 2009
@@ -26,11 +26,6 @@
*/
public class SpringToAsyncTest extends ToAsyncTest {
- @Override
- public boolean isUseRouteBuilder() {
- return false;
- }
-
protected CamelContext createCamelContext() throws Exception {
return createSpringCamelContext(this,
"org/apache/camel/spring/processor/SpringToAsyncTest.xml");
}
Copied:
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java
(from r834396,
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java&r1=834396&r2=834417&rev=834417&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
(original)
+++
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTwoTest.java
Tue Nov 10 10:40:46 2009
@@ -17,22 +17,17 @@
package org.apache.camel.spring.processor;
import org.apache.camel.CamelContext;
-import org.apache.camel.processor.async.ToAsyncTest;
+import org.apache.camel.processor.async.ToAsyncTwoTest;
import static
org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
/**
* @version $Revision$
*/
-public class SpringToAsyncTest extends ToAsyncTest {
-
- @Override
- public boolean isUseRouteBuilder() {
- return false;
- }
+public class SpringToAsyncTwoTest extends ToAsyncTwoTest {
protected CamelContext createCamelContext() throws Exception {
- return createSpringCamelContext(this,
"org/apache/camel/spring/processor/SpringToAsyncTest.xml");
+ return createSpringCamelContext(this,
"org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml");
}
-}
+}
\ No newline at end of file
Copied:
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml
(from r834396,
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml&r1=834396&r2=834417&rev=834417&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml
(original)
+++
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTwoTest.xml
Tue Nov 10 10:40:46 2009
@@ -27,6 +27,10 @@
<from uri="direct:start"/>
<to uri="mock:a"/>
<to uri="direct:bar" async="true" poolSize="5"/>
+ <to uri="mock:c"/>
+ <to uri="direct:foo" async="true" poolSize="2"/>
+ <to uri="mock:e"/>
+ <transform><constant>Hi World</constant></transform>
<to uri="mock:result"/>
</route>
@@ -35,6 +39,13 @@
<to uri="mock:b"/>
<transform><constant>Bye World</constant></transform>
</route>
+
+ <route>
+ <from uri="direct:foo"/>
+ <to uri="mock:d"/>
+ <transform><constant>Bye Again World</constant></transform>
+ </route>
+
</camelContext>
</beans>