Author: davsclaus Date: Tue Nov 10 09:28:05 2009 New Revision: 834396 URL: http://svn.apache.org/viewvc?rev=834396&view=rev Log: CAMEL-2151: Added spring DSL for toAsync.
Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java (with props) camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml - copied, changed from r834163, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=834396&r1=834395&r2=834396&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Tue Nov 10 09:28:05 2009 @@ -320,23 +320,6 @@ /** * Sends the exchange to the given endpoint * - * @param uri the endpoint to send to - * @return the builder - */ - @SuppressWarnings("unchecked") - public Type toAsync(String uri) { - ToDefinition answer = new ToDefinition(uri); - answer.setAsync(true); - addOutput(answer); - // must push a block so we have a child route for the async reply - // routing which is separated from the caller route - pushBlock(answer); - return (Type) this; - } - - /** - * Sends the exchange to the given endpoint - * * @param uri the String formatted endpoint uri to send to * @param args arguments for the string formatting of the uri * @return the builder @@ -476,6 +459,77 @@ return (Type) this; } + /** + * Sends the exchange to the given endpoint using synchronous mode. + * + * @param uri the endpoint to send to + * @return the builder + * @see org.apache.camel.AsyncProcessor + */ + public ToDefinition toAsync(String uri) { + ToDefinition answer = new ToDefinition(uri); + answer.setAsync(true); + addOutput(answer); + // must push a block so we have a child route for the async reply + // routing which is separated from the caller route + pushBlock(answer); + return answer; + } + + /** + * Sends the exchange to the given endpoint using synchronous mode. + * + * @param uri the endpoint to send to + * @param poolSize the core pool size + * @return the builder + * @see org.apache.camel.AsyncProcessor + */ + public ToDefinition toAsync(String uri, int poolSize) { + ToDefinition answer = new ToDefinition(uri); + answer.setAsync(true); + answer.setPoolSize(poolSize); + addOutput(answer); + // must push a block so we have a child route for the async reply + // routing which is separated from the caller route + pushBlock(answer); + return answer; + } + + /** + * Sends the exchange to the given endpoint using synchronous mode. + * + * @param endpoint the endpoint to send to + * @return the builder + * @see org.apache.camel.AsyncProcessor + */ + public ToDefinition toAsync(Endpoint endpoint) { + ToDefinition answer = new ToDefinition(endpoint); + answer.setAsync(true); + addOutput(answer); + // must push a block so we have a child route for the async reply + // routing which is separated from the caller route + pushBlock(answer); + return answer; + } + + /** + * Sends the exchange to the given endpoint using synchronous mode. + * + * @param endpoint the endpoint to send to + * @param poolSize the core pool size + * @return the builder + * @see org.apache.camel.AsyncProcessor + */ + public ToDefinition toAsync(Endpoint endpoint, int poolSize) { + ToDefinition answer = new ToDefinition(endpoint); + answer.setAsync(true); + answer.setPoolSize(poolSize); + addOutput(answer); + // must push a block so we have a child route for the async reply + // routing which is separated from the caller route + pushBlock(answer); + return answer; + } /** * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=834396&r1=834395&r2=834396&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Tue Nov 10 09:28:05 2009 @@ -80,7 +80,7 @@ } /** - * Setting the executor service for executing the multicasting action. + * Setting the executor service for the thread pool * * @return the builder */ @@ -90,6 +90,16 @@ } /** + * Setting the executor service for the thread pool + * + * @return the builder + */ + public ThreadsDefinition executorServiceRef(String executorServiceRef) { + setExecutorServiceRef(executorServiceRef); + return this; + } + + /** * Setting the core pool size for the underlying {...@link java.util.concurrent.ExecutorService}. * * @return the builder @@ -120,6 +130,14 @@ this.executorService = executorService; } + public String getExecutorServiceRef() { + return executorServiceRef; + } + + public void setExecutorServiceRef(String executorServiceRef) { + this.executorServiceRef = executorServiceRef; + } + public Integer getPoolSize() { return poolSize; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=834396&r1=834395&r2=834396&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Tue Nov 10 09:28:05 2009 @@ -45,7 +45,7 @@ @XmlAttribute(required = false) private ExchangePattern pattern; @XmlAttribute(required = false) - private Boolean async; + private Boolean async = Boolean.FALSE; @XmlTransient private ExecutorService executorService; @XmlAttribute(required = false) @@ -113,7 +113,7 @@ @Override public String toString() { if (async != null && async) { - return "ToAsync[" + getLabel() + "]"; + return "ToAsync[" + getLabel() + "] -> " + getOutputs(); } else { return "To[" + getLabel() + "]"; } @@ -137,6 +137,30 @@ this.async = async; } + public Integer getPoolSize() { + return poolSize; + } + + public void setPoolSize(Integer poolSize) { + this.poolSize = poolSize; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + public String getExecutorServiceRef() { + return executorServiceRef; + } + + public void setExecutorServiceRef(String executorServiceRef) { + this.executorServiceRef = executorServiceRef; + } + /** * Sets the optional {...@link ExchangePattern} used to invoke this endpoint */ @@ -144,4 +168,35 @@ this.pattern = pattern; } + /** + * Setting the executor service for executing the async routing. + * + * @return the builder + */ + public ToDefinition executorService(ExecutorService executorService) { + setExecutorService(executorService); + return this; + } + + /** + * Setting the executor service for executing the async routing. + * + * @return the builder + */ + public ToDefinition executorServiceRef(String executorServiceRef) { + setExecutorServiceRef(executorServiceRef); + return this; + } + + /** + * Setting the core pool size for the underlying {...@link java.util.concurrent.ExecutorService}. + * + * @return the builder + */ + public ToDefinition poolSize(int poolSize) { + setPoolSize(poolSize); + return this; + } + + } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java?rev=834396&r1=834395&r2=834396&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java Tue Nov 10 09:28:05 2009 @@ -51,7 +51,7 @@ return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start").to("mock:a").toAsync("direct:bar").to("mock:result"); + from("direct:start").to("mock:a").toAsync("direct:bar", 5).to("mock:result"); from("direct:bar").to("mock:b").transform(constant("Bye World")); } Modified: camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java?rev=834396&r1=834395&r2=834396&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java (original) +++ camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/CamelContextFactoryBean.java Tue Nov 10 09:28:05 2009 @@ -51,6 +51,7 @@ import org.apache.camel.model.RouteBuilderDefinition; import org.apache.camel.model.RouteContainer; import org.apache.camel.model.RouteDefinition; +import org.apache.camel.model.ToDefinition; import org.apache.camel.model.TransactedDefinition; import org.apache.camel.model.config.PropertiesDefinition; import org.apache.camel.model.dataformat.DataFormatsDefinition; @@ -299,8 +300,10 @@ initOnCompletions(route); // then polices initPolicies(route); - // and last on exception + // then on exception initOnExceptions(route); + // and then for toAsync + initToAsync(route); } if (dataFormats != null) { @@ -317,6 +320,33 @@ installRoutes(); } + private void initToAsync(RouteDefinition route) { + List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>(); + ToDefinition toAsync = null; + + for (ProcessorDefinition output : route.getOutputs()) { + if (toAsync != null) { + // add this output on toAsync + toAsync.getOutputs().add(output); + } else { + // regular outputs + outputs.add(output); + } + + if (output instanceof ToDefinition) { + ToDefinition to = (ToDefinition) output; + if (to.isAsync() != null && to.isAsync()) { + // new current to async + toAsync = to; + } + } + } + + // rebuild outputs + route.clearOutput(); + route.getOutputs().addAll(outputs); + } + private void initOnExceptions(RouteDefinition route) { List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>(); List<ProcessorDefinition<?>> exceptionHandlers = new ArrayList<ProcessorDefinition<?>>(); Added: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java?rev=834396&view=auto ============================================================================== --- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java (added) +++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java Tue Nov 10 09:28:05 2009 @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.processor.async.ToAsyncTest; + +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; + +/** + * @version $Revision$ + */ +public class SpringToAsyncTest extends ToAsyncTest { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + protected CamelContext createCamelContext() throws Exception { + return createSpringCamelContext(this, "org/apache/camel/spring/processor/SpringToAsyncTest.xml"); + } + +} Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringToAsyncTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml (from r834163, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml&r1=834163&r2=834396&rev=834396&view=diff ============================================================================== --- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml (original) +++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringToAsyncTest.xml Tue Nov 10 09:28:05 2009 @@ -22,23 +22,18 @@ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> - <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> - <!-- START SNIPPET: e1 --> <route> <from uri="direct:start"/> - <to uri="log:foo"/> - <wireTap uri="direct:tap"/> + <to uri="mock:a"/> + <to uri="direct:bar" async="true" poolSize="5"/> <to uri="mock:result"/> </route> - <!-- END SNIPPET: e1 --> <route> - <from uri="direct:tap"/> - <delay><constant>1000</constant></delay> - <setBody><constant>Tapped</constant></setBody> - <to uri="mock:result"/> - <to uri="mock:tap"/> + <from uri="direct:bar"/> + <to uri="mock:b"/> + <transform><constant>Bye World</constant></transform> </route> </camelContext>