Author: davsclaus
Date: Tue Nov 10 09:28:05 2009
New Revision: 834396
URL: http://svn.apache.org/viewvc?rev=834396&view=rev
Log:
CAMEL-2151: Added spring DSL for toAsync.
Added:
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
(with props)
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml
- copied, changed from r834163,
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=834396&r1=834395&r2=834396&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
Tue Nov 10 09:28:05 2009
@@ -320,23 +320,6 @@
/**
* Sends the exchange to the given endpoint
*
- * @param uri the endpoint to send to
- * @return the builder
- */
- @SuppressWarnings("unchecked")
- public Type toAsync(String uri) {
- ToDefinition answer = new ToDefinition(uri);
- answer.setAsync(true);
- addOutput(answer);
- // must push a block so we have a child route for the async reply
- // routing which is separated from the caller route
- pushBlock(answer);
- return (Type) this;
- }
-
- /**
- * Sends the exchange to the given endpoint
- *
* @param uri the String formatted endpoint uri to send to
* @param args arguments for the string formatting of the uri
* @return the builder
@@ -476,6 +459,77 @@
return (Type) this;
}
+ /**
+ * Sends the exchange to the given endpoint using synchronous mode.
+ *
+ * @param uri the endpoint to send to
+ * @return the builder
+ * @see org.apache.camel.AsyncProcessor
+ */
+ public ToDefinition toAsync(String uri) {
+ ToDefinition answer = new ToDefinition(uri);
+ answer.setAsync(true);
+ addOutput(answer);
+ // must push a block so we have a child route for the async reply
+ // routing which is separated from the caller route
+ pushBlock(answer);
+ return answer;
+ }
+
+ /**
+ * Sends the exchange to the given endpoint using synchronous mode.
+ *
+ * @param uri the endpoint to send to
+ * @param poolSize the core pool size
+ * @return the builder
+ * @see org.apache.camel.AsyncProcessor
+ */
+ public ToDefinition toAsync(String uri, int poolSize) {
+ ToDefinition answer = new ToDefinition(uri);
+ answer.setAsync(true);
+ answer.setPoolSize(poolSize);
+ addOutput(answer);
+ // must push a block so we have a child route for the async reply
+ // routing which is separated from the caller route
+ pushBlock(answer);
+ return answer;
+ }
+
+ /**
+ * Sends the exchange to the given endpoint using synchronous mode.
+ *
+ * @param endpoint the endpoint to send to
+ * @return the builder
+ * @see org.apache.camel.AsyncProcessor
+ */
+ public ToDefinition toAsync(Endpoint endpoint) {
+ ToDefinition answer = new ToDefinition(endpoint);
+ answer.setAsync(true);
+ addOutput(answer);
+ // must push a block so we have a child route for the async reply
+ // routing which is separated from the caller route
+ pushBlock(answer);
+ return answer;
+ }
+
+ /**
+ * Sends the exchange to the given endpoint using synchronous mode.
+ *
+ * @param endpoint the endpoint to send to
+ * @param poolSize the core pool size
+ * @return the builder
+ * @see org.apache.camel.AsyncProcessor
+ */
+ public ToDefinition toAsync(Endpoint endpoint, int poolSize) {
+ ToDefinition answer = new ToDefinition(endpoint);
+ answer.setAsync(true);
+ answer.setPoolSize(poolSize);
+ addOutput(answer);
+ // must push a block so we have a child route for the async reply
+ // routing which is separated from the caller route
+ pushBlock(answer);
+ return answer;
+ }
/**
* <a
href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a>
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=834396&r1=834395&r2=834396&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
Tue Nov 10 09:28:05 2009
@@ -80,7 +80,7 @@
}
/**
- * Setting the executor service for executing the multicasting action.
+ * Setting the executor service for the thread pool
*
* @return the builder
*/
@@ -90,6 +90,16 @@
}
/**
+ * Setting the executor service for the thread pool
+ *
+ * @return the builder
+ */
+ public ThreadsDefinition executorServiceRef(String executorServiceRef) {
+ setExecutorServiceRef(executorServiceRef);
+ return this;
+ }
+
+ /**
* Setting the core pool size for the underlying {...@link
java.util.concurrent.ExecutorService}.
*
* @return the builder
@@ -120,6 +130,14 @@
this.executorService = executorService;
}
+ public String getExecutorServiceRef() {
+ return executorServiceRef;
+ }
+
+ public void setExecutorServiceRef(String executorServiceRef) {
+ this.executorServiceRef = executorServiceRef;
+ }
+
public Integer getPoolSize() {
return poolSize;
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=834396&r1=834395&r2=834396&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
Tue Nov 10 09:28:05 2009
@@ -45,7 +45,7 @@
@XmlAttribute(required = false)
private ExchangePattern pattern;
@XmlAttribute(required = false)
- private Boolean async;
+ private Boolean async = Boolean.FALSE;
@XmlTransient
private ExecutorService executorService;
@XmlAttribute(required = false)
@@ -113,7 +113,7 @@
@Override
public String toString() {
if (async != null && async) {
- return "ToAsync[" + getLabel() + "]";
+ return "ToAsync[" + getLabel() + "] -> " + getOutputs();
} else {
return "To[" + getLabel() + "]";
}
@@ -137,6 +137,30 @@
this.async = async;
}
+ public Integer getPoolSize() {
+ return poolSize;
+ }
+
+ public void setPoolSize(Integer poolSize) {
+ this.poolSize = poolSize;
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ public String getExecutorServiceRef() {
+ return executorServiceRef;
+ }
+
+ public void setExecutorServiceRef(String executorServiceRef) {
+ this.executorServiceRef = executorServiceRef;
+ }
+
/**
* Sets the optional {...@link ExchangePattern} used to invoke this
endpoint
*/
@@ -144,4 +168,35 @@
this.pattern = pattern;
}
+ /**
+ * Setting the executor service for executing the async routing.
+ *
+ * @return the builder
+ */
+ public ToDefinition executorService(ExecutorService executorService) {
+ setExecutorService(executorService);
+ return this;
+ }
+
+ /**
+ * Setting the executor service for executing the async routing.
+ *
+ * @return the builder
+ */
+ public ToDefinition executorServiceRef(String executorServiceRef) {
+ setExecutorServiceRef(executorServiceRef);
+ return this;
+ }
+
+ /**
+ * Setting the core pool size for the underlying {...@link
java.util.concurrent.ExecutorService}.
+ *
+ * @return the builder
+ */
+ public ToDefinition poolSize(int poolSize) {
+ setPoolSize(poolSize);
+ return this;
+ }
+
+
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java?rev=834396&r1=834395&r2=834396&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java
Tue Nov 10 09:28:05 2009
@@ -51,7 +51,7 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
-
from("direct:start").to("mock:a").toAsync("direct:bar").to("mock:result");
+ from("direct:start").to("mock:a").toAsync("direct:bar",
5).to("mock:result");
from("direct:bar").to("mock:b").transform(constant("Bye
World"));
}
Modified:
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java?rev=834396&r1=834395&r2=834396&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
(original)
+++
camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java
Tue Nov 10 09:28:05 2009
@@ -51,6 +51,7 @@
import org.apache.camel.model.RouteBuilderDefinition;
import org.apache.camel.model.RouteContainer;
import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.ToDefinition;
import org.apache.camel.model.TransactedDefinition;
import org.apache.camel.model.config.PropertiesDefinition;
import org.apache.camel.model.dataformat.DataFormatsDefinition;
@@ -299,8 +300,10 @@
initOnCompletions(route);
// then polices
initPolicies(route);
- // and last on exception
+ // then on exception
initOnExceptions(route);
+ // and then for toAsync
+ initToAsync(route);
}
if (dataFormats != null) {
@@ -317,6 +320,33 @@
installRoutes();
}
+ private void initToAsync(RouteDefinition route) {
+ List<ProcessorDefinition<?>> outputs = new
ArrayList<ProcessorDefinition<?>>();
+ ToDefinition toAsync = null;
+
+ for (ProcessorDefinition output : route.getOutputs()) {
+ if (toAsync != null) {
+ // add this output on toAsync
+ toAsync.getOutputs().add(output);
+ } else {
+ // regular outputs
+ outputs.add(output);
+ }
+
+ if (output instanceof ToDefinition) {
+ ToDefinition to = (ToDefinition) output;
+ if (to.isAsync() != null && to.isAsync()) {
+ // new current to async
+ toAsync = to;
+ }
+ }
+ }
+
+ // rebuild outputs
+ route.clearOutput();
+ route.getOutputs().addAll(outputs);
+ }
+
private void initOnExceptions(RouteDefinition route) {
List<ProcessorDefinition<?>> outputs = new
ArrayList<ProcessorDefinition<?>>();
List<ProcessorDefinition<?>> exceptionHandlers = new
ArrayList<ProcessorDefinition<?>>();
Added:
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java?rev=834396&view=auto
==============================================================================
---
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
(added)
+++
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
Tue Nov 10 09:28:05 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.async.ToAsyncTest;
+
+import static
org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+/**
+ * @version $Revision$
+ */
+public class SpringToAsyncTest extends ToAsyncTest {
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ return createSpringCamelContext(this,
"org/apache/camel/spring/processor/SpringToAsyncTest.xml");
+ }
+
+}
Propchange:
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied:
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml
(from r834163,
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml&r1=834163&r2=834396&rev=834396&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml
(original)
+++
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml
Tue Nov 10 09:28:05 2009
@@ -22,23 +22,18 @@
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
">
-
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
- <!-- START SNIPPET: e1 -->
<route>
<from uri="direct:start"/>
- <to uri="log:foo"/>
- <wireTap uri="direct:tap"/>
+ <to uri="mock:a"/>
+ <to uri="direct:bar" async="true" poolSize="5"/>
<to uri="mock:result"/>
</route>
- <!-- END SNIPPET: e1 -->
<route>
- <from uri="direct:tap"/>
- <delay><constant>1000</constant></delay>
- <setBody><constant>Tapped</constant></setBody>
- <to uri="mock:result"/>
- <to uri="mock:tap"/>
+ <from uri="direct:bar"/>
+ <to uri="mock:b"/>
+ <transform><constant>Bye World</constant></transform>
</route>
</camelContext>