Author: davsclaus Date: Mon Nov 9 18:19:39 2009 New Revision: 834165 URL: http://svn.apache.org/viewvc?rev=834165&view=rev Log: CAMEL-2151: Introduced AsyncProcessor for async non blocking request reply processing. Work in progress.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java (with props) camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java (with props) camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java Added: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java?rev=834165&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java Mon Nov 9 18:19:39 2009 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +/** + * Callback when processing an {...@link Exchange} using {...@link org.apache.camel.AsyncProcessor} + * and the {...@link Exchange} have received the data and is ready to be routed. + * + * @version $Revision$ + */ +public interface AsyncCallback { + + /** + * Callback when the {...@link Exchange} is ready to be routed as data has been received. + * + * @param exchange the exchange + */ + void onDataReceived(Exchange exchange); +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java?rev=834165&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java Mon Nov 9 18:19:39 2009 @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +/** + * An <b>asynchronous</b> processor which can process an {...@link Exchange} in an asynchronous fashion + * and signal completion by invoking the {...@link AsyncCallback}. + * <p/> + * For example {...@link Producer} can implmenet this interface to support real asynchronous non blocking + * when using the {...@link org.apache.camel.processor.SendAsyncProcessor}. + * + * @version $Revision$ + */ +public interface AsyncProcessor extends Processor { + + /** + * Processes the message exchange + * + * @param exchange the message exchange + * @param callback the callback to invoke when data has been received and the {...@link Exchange} + * is ready to be continued routed. + * @throws Exception if an internal processing error has occurred. + */ + void process(Exchange exchange, AsyncCallback callback) throws Exception; +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=834165&r1=834164&r2=834165&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Mon Nov 9 18:19:39 2009 @@ -24,7 +24,9 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.impl.ServiceSupport; +import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.util.concurrent.ExecutorServiceHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,6 +42,7 @@ private SedaEndpoint endpoint; private Processor processor; private ExecutorService executor; + private ExceptionHandler exceptionHandler; public SedaConsumer(SedaEndpoint endpoint, Processor processor) { this.endpoint = endpoint; @@ -55,6 +58,17 @@ return endpoint; } + public ExceptionHandler getExceptionHandler() { + if (exceptionHandler == null) { + exceptionHandler = new LoggingExceptionHandler(getClass()); + } + return exceptionHandler; + } + + public void setExceptionHandler(ExceptionHandler exceptionHandler) { + this.exceptionHandler = exceptionHandler; + } + public void run() { BlockingQueue<Exchange> queue = endpoint.getQueue(); while (queue != null && isRunAllowed()) { @@ -70,7 +84,7 @@ try { processor.process(exchange); } catch (Exception e) { - LOG.error("Seda queue caught: " + e, e); + getExceptionHandler().handleException(e); } } else { LOG.warn("This consumer is stopped during polling an exchange, so putting it back on the seda queue: " + exchange); @@ -78,6 +92,7 @@ queue.put(exchange); } catch (InterruptedException e) { LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped())); + continue; } } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java?rev=834165&r1=834164&r2=834165&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java Mon Nov 9 18:19:39 2009 @@ -87,8 +87,8 @@ answer = doConvertTo(type, exchange, value); } catch (Exception e) { // if its a ExecutionException then we have rethrow it as its not due to failed conversion - boolean execution = ObjectHelper.getException(ExecutionException.class, e) != null || - ObjectHelper.getException(CamelExecutionException.class, e) != null; + boolean execution = ObjectHelper.getException(ExecutionException.class, e) != null + || ObjectHelper.getException(CamelExecutionException.class, e) != null; if (execution) { throw ObjectHelper.wrapCamelExecutionException(exchange, e); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=834165&r1=834164&r2=834165&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Mon Nov 9 18:19:39 2009 @@ -320,6 +320,23 @@ /** * Sends the exchange to the given endpoint * + * @param uri the endpoint to send to + * @return the builder + */ + @SuppressWarnings("unchecked") + public Type toAsync(String uri) { + ToDefinition answer = new ToDefinition(uri); + answer.setAsync(true); + addOutput(answer); + // must push a block so we have a child route for the async reply + // routing which is separated from the caller route + pushBlock(answer); + return (Type) this; + } + + /** + * Sends the exchange to the given endpoint + * * @param uri the String formatted endpoint uri to send to * @param args arguments for the string formatting of the uri * @return the builder Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=834165&r1=834164&r2=834165&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Mon Nov 9 18:19:39 2009 @@ -16,13 +16,21 @@ */ package org.apache.camel.model; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.Endpoint; import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.processor.SendAsyncProcessor; +import org.apache.camel.spi.RouteContext; +import org.apache.camel.util.concurrent.ExecutorServiceHelper; /** * Represents an XML <to/> element @@ -32,8 +40,18 @@ @XmlRootElement(name = "to") @XmlAccessorType(XmlAccessType.FIELD) public class ToDefinition extends SendDefinition<ToDefinition> { + @XmlTransient + private final List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>(); @XmlAttribute(required = false) private ExchangePattern pattern; + @XmlAttribute(required = false) + private Boolean async; + @XmlTransient + private ExecutorService executorService; + @XmlAttribute(required = false) + private String executorServiceRef; + @XmlAttribute(required = false) + private Integer poolSize; public ToDefinition() { } @@ -57,8 +75,48 @@ } @Override + public List<ProcessorDefinition> getOutputs() { + return outputs; + } + + @Override + public Processor createProcessor(RouteContext routeContext) throws Exception { + if (async == null || !async) { + // when sync then let super create the processor + return super.createProcessor(routeContext); + } + + if (executorServiceRef != null) { + executorService = routeContext.lookup(executorServiceRef, ExecutorService.class); + } + if (executorService == null && poolSize != null) { + executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "ToAsync", true); + } + + // create the child processor which is the async route + Processor childProcessor = routeContext.createProcessor(this); + + // create async processor + Endpoint endpoint = resolveEndpoint(routeContext); + + SendAsyncProcessor async = new SendAsyncProcessor(endpoint, getPattern(), childProcessor); + if (executorService != null) { + async.setExecutorService(executorService); + } + if (poolSize != null) { + async.setPoolSize(poolSize); + } + + return async; + } + + @Override public String toString() { - return "To[" + getLabel() + "]"; + if (async != null && async) { + return "ToAsync[" + getLabel() + "]"; + } else { + return "To[" + getLabel() + "]"; + } } @Override @@ -71,6 +129,14 @@ return pattern; } + public Boolean isAsync() { + return async; + } + + public void setAsync(Boolean async) { + this.async = async; + } + /** * Sets the optional {...@link ExchangePattern} used to invoke this endpoint */ Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=834165&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Mon Nov 9 18:19:39 2009 @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.ProducerCallback; +import org.apache.camel.impl.LoggingExceptionHandler; +import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.concurrent.ExecutorServiceHelper; + +/** + * @version $Revision$ + */ +public class SendAsyncProcessor extends SendProcessor implements Runnable { + + private static final int DEFAULT_THREADPOOL_SIZE = 10; + protected final Processor target; + protected final BlockingQueue<Exchange> completedTasks = new LinkedBlockingQueue<Exchange>(); + protected ExecutorService executorService; + protected int poolSize = DEFAULT_THREADPOOL_SIZE; + protected ExceptionHandler exceptionHandler; + + public SendAsyncProcessor(Endpoint destination, Processor target) { + super(destination); + this.target = target; + } + + public SendAsyncProcessor(Endpoint destination, ExchangePattern pattern, Processor target) { + super(destination, pattern); + this.target = target; + } + + @Override + protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) { + // use a new copy of the exchange to route async and handover the on completion to the new copy + // so its the new copy that performs the on completion callback when its done + final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true); + if (pattern != null) { + copy.setPattern(pattern); + } else { + // default to use in out as we do request reply over async + copy.setPattern(ExchangePattern.InOut); + } + // configure the endpoint we are sending to + copy.setProperty(Exchange.TO_ENDPOINT, destination.getEndpointUri()); + // send the copy + return copy; + } + + @Override + public Exchange doProcess(Exchange exchange) throws Exception { + // now we are done, we should have a API callback for this + // send the exchange to the destination using a producer + Exchange answer = getProducerCache(exchange).doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() { + public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception { + exchange = configureExchange(exchange, pattern); + + if (producer instanceof AsyncProcessor) { + // let the producer use this callback to signal completion + AsyncProcessor asyncProcessor = (AsyncProcessor) producer; + + // pass in the callback that adds the exchange to the completed list of tasks + final AsyncCallback callback = new AsyncCallback() { + public void onDataReceived(Exchange exchange) { + completedTasks.add(exchange); + } + }; + + asyncProcessor.process(exchange, callback); + } else { + // its not a real AsyncProcessor so simulate async processing + producer.process(exchange); + completedTasks.add(exchange); + } + + return exchange; + } + }); + + return answer; + } + + @Override + public String toString() { + return "sendAsyncTo(" + destination + (pattern != null ? " " + pattern : "") + " -> " + target + ")"; + } + + public ExecutorService getExecutorService() { + if (executorService == null) { + executorService = createExecutorService(); + } + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + public int getPoolSize() { + return poolSize; + } + + public void setPoolSize(int poolSize) { + this.poolSize = poolSize; + } + + public ExceptionHandler getExceptionHandler() { + if (exceptionHandler == null) { + exceptionHandler = new LoggingExceptionHandler(getClass()); + } + return exceptionHandler; + } + + public void setExceptionHandler(ExceptionHandler exceptionHandler) { + this.exceptionHandler = exceptionHandler; + } + + public void run() { + while (isRunAllowed()) { + Exchange exchange; + try { + // TODO: Wonder if we can use take instead of poll with timeout? + exchange = completedTasks.poll(1000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped())); + continue; + } + + if (exchange != null) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Async reply received now routing the Exchange: " + exchange); + } + target.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException(e); + } + } + } + } + + protected ExecutorService createExecutorService() { + return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "SendAsyncProcessor", true); + } + + protected void doStart() throws Exception { + super.doStart(); + + for (int i = 0; i < poolSize; i++) { + getExecutorService().execute(this); + } + } + + protected void doStop() throws Exception { + super.doStop(); + + if (executorService != null) { + executorService.shutdownNow(); + executorService = null; + } + completedTasks.clear(); + + } + +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=834165&r1=834164&r2=834165&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Mon Nov 9 18:19:39 2009 @@ -79,8 +79,19 @@ } } + doProcess(exchange); + } + + /** + * Strategy to process the exchange + * + * @param exchange the exchange + * @throws Exception can be thrown if error processing exchange + * @return the exchange that was processed + */ + public Exchange doProcess(final Exchange exchange) throws Exception { // send the exchange to the destination using a producer - getProducerCache(exchange).doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() { + return getProducerCache(exchange).doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() { public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception { exchange = configureExchange(exchange, pattern); producer.process(exchange); Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java?rev=834165&r1=834164&r2=834165&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java Mon Nov 9 18:19:39 2009 @@ -33,6 +33,7 @@ for (int i = 0; i < size; i++) { template.sendBody(url, "Message " + i); + Thread.sleep(3); } assertMockEndpointsSatisfied(); Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java?rev=834165&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java Mon Nov 9 18:19:39 2009 @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.async; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class ToAsyncTest extends ContextTestSupport { + + public void testToAsync() throws Exception { + getMockEndpoint("mock:a").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:b").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedMessageCount(1); + getMockEndpoint("mock:result").message(0).outBody(String.class).isEqualTo("Bye World"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + // and it should be different exchange ids + + String ida = getMockEndpoint("mock:a").getReceivedExchanges().get(0).getExchangeId(); + String idb = getMockEndpoint("mock:b").getReceivedExchanges().get(0).getExchangeId(); + String idresult = getMockEndpoint("mock:result").getReceivedExchanges().get(0).getExchangeId(); + + // id a should be different and id b and id result the same + assertNotSame(ida, idb); + assertNotSame(ida, idresult); + assertSame(idb, idresult); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("mock:a").toAsync("direct:bar").to("mock:result"); + + from("direct:bar").to("mock:b").transform(constant("Bye World")); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ToAsyncTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java?rev=834165&r1=834164&r2=834165&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java (original) +++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyContentExchange.java Mon Nov 9 18:19:39 2009 @@ -18,12 +18,12 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,7 +45,7 @@ private CountDownLatch bodyComplete = new CountDownLatch(1); private volatile boolean failed; private volatile Exchange exchange; - private volatile Collection<Exchange> completeTasks; + private volatile AsyncCallback callback; public JettyContentExchange() { // keep headers by default @@ -56,8 +56,8 @@ this.exchange = exchange; } - public void setCompleteTasks(Collection<Exchange> completeTasks) { - this.completeTasks = completeTasks; + public void setCallback(AsyncCallback callback) { + this.callback = callback; } @Override @@ -82,12 +82,13 @@ LOG.debug("onResponseComplete for " + getUrl()); } - if (completeTasks != null && exchange != null) { + if (callback != null && exchange != null) { if (LOG.isTraceEnabled()) { LOG.trace("Adding Exchange to completed task: " + exchange); } - // we are complete so add the exchange to completed tasks - completeTasks.add(exchange); + + // signal we are complete + callback.onDataReceived(exchange); } } Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java?rev=834165&r1=834164&r2=834165&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java (original) +++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpEndpoint.java Mon Nov 9 18:19:39 2009 @@ -36,8 +36,6 @@ private boolean sessionSupport; private List<Handler> handlers; private HttpClient client; - private boolean synchronous = true; - private int concurrentConsumers = 1; public JettyHttpEndpoint(JettyHttpComponent component, String uri, URI httpURL) throws URISyntaxException { super(uri, component, httpURL); @@ -85,20 +83,4 @@ this.client = client; } - public boolean isSynchronous() { - return synchronous; - } - - public void setSynchronous(boolean synchronous) { - this.synchronous = synchronous; - } - - public int getConcurrentConsumers() { - return concurrentConsumers; - } - - public void setConcurrentConsumers(int concurrentConsumers) { - this.concurrentConsumers = concurrentConsumers; - } - } Modified: camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java?rev=834165&r1=834164&r2=834165&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java (original) +++ camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpProducer.java Mon Nov 9 18:19:39 2009 @@ -19,21 +19,16 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; import org.apache.camel.component.http.HttpMethods; import org.apache.camel.component.http.helper.HttpProducerHelper; import org.apache.camel.impl.DefaultProducer; -import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; -import org.apache.camel.util.concurrent.ExecutorServiceHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.mortbay.jetty.client.HttpClient; @@ -41,10 +36,8 @@ /** * @version $Revision$ */ -public class JettyHttpProducer extends DefaultProducer implements Runnable { +public class JettyHttpProducer extends DefaultProducer implements AsyncProcessor { private static final transient Log LOG = LogFactory.getLog(JettyHttpProducer.class); - private final BlockingQueue<Exchange> completeTasks = new LinkedBlockingQueue<Exchange>(); - private ExecutorService executor; private final HttpClient client; // TODO: support that bridge option @@ -64,37 +57,30 @@ HttpClient client = getEndpoint().getClient(); JettyContentExchange httpExchange = createHttpExchange(exchange); - - if (getEndpoint().isSynchronous()) { - sendSynchronous(exchange, client, httpExchange); - } else { - sendAsynchronous(exchange, client, httpExchange); - } + sendSynchronous(exchange, client, httpExchange); } + public void process(Exchange exchange, AsyncCallback callback) throws Exception { + HttpClient client = getEndpoint().getClient(); - protected void sendAsynchronous(final Exchange exchange, final HttpClient client, final JettyContentExchange httpExchange) throws IOException { - // use a new copy of the exchange to route async and handover the on completion to the new copy - // so its the new copy that performs the on completion callback when its done - final Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true); - // the copy must be an in ouy MEP - copy.setPattern(ExchangePattern.InOut); - - // configure http exchange to signal when its complete - httpExchange.setCompleteTasks(completeTasks); - httpExchange.setExchange(copy); + JettyContentExchange httpExchange = createHttpExchange(exchange); + sendAsynchronous(exchange, client, httpExchange, callback); + } + protected void sendSynchronous(Exchange exchange, HttpClient client, JettyContentExchange httpExchange) throws IOException { // set the body with the message holder - copy.setOut(new JettyHttpMessage(exchange, httpExchange, getEndpoint().isThrowExceptionOnFailure())); + exchange.setOut(new JettyHttpMessage(exchange, httpExchange, getEndpoint().isThrowExceptionOnFailure())); doSendExchange(client, httpExchange); + } + + protected void sendAsynchronous(final Exchange exchange, final HttpClient client, final JettyContentExchange httpExchange, + final AsyncCallback callback) throws IOException { - // now we need to let the original exchange to stop - // and let that copy exchange continue // TODO: Use something that marks it as async routed - exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); - } + exchange.setProperty("CamelSendAsync", Boolean.TRUE); + httpExchange.setCallback(callback); + httpExchange.setExchange(exchange); - protected void sendSynchronous(Exchange exchange, HttpClient client, JettyContentExchange httpExchange) throws IOException { // set the body with the message holder exchange.setOut(new JettyHttpMessage(exchange, httpExchange, getEndpoint().isThrowExceptionOnFailure())); @@ -141,56 +127,16 @@ } } - public void run() { - while (isRunAllowed()) { - Exchange exchange; - try { - // TODO: Wonder if we can use take instead of poll with timeout? - exchange = completeTasks.poll(1000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped())); - continue; - } - - if (exchange != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Continue to route Exchange: " + exchange); - } - - // TODO: hook into exiting route path - exchange.getContext().createProducerTemplate().send("mock:result", exchange); - } - } - } - @Override protected void doStart() throws Exception { super.doStart(); - client.start(); - - // this is only needed if we are asynchronous where we need to have a thread pool of listeners - // that will process the completed tasks - if (!getEndpoint().isSynchronous()) { - int poolSize = getEndpoint().getConcurrentConsumers(); - executor = ExecutorServiceHelper.newFixedThreadPool(poolSize, getEndpoint().getEndpointUri(), true); - for (int i = 0; i < poolSize; i++) { - executor.execute(this); - } - } } @Override protected void doStop() throws Exception { super.doStop(); - client.stop(); - - if (executor != null) { - executor.shutdownNow(); - executor = null; - } - completeTasks.clear(); } } Modified: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java?rev=834165&r1=834164&r2=834165&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java (original) +++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerAsynchronousTest.java Mon Nov 9 18:19:39 2009 @@ -33,7 +33,7 @@ private static String thread1; private static String thread2; - private String url = "jetty://http://0.0.0.0:9123/foo?synchronous=false&concurrentConsumers=5"; + private String url = "jetty://http://0.0.0.0:9123/foo"; @Test public void testAsynchronous() throws Exception { @@ -42,7 +42,7 @@ MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); - mock.message(0).outBody().isEqualTo("Bye World"); + mock.message(0).body().isEqualTo("Bye World"); Object body = null; template.sendBody("direct:start", body); @@ -61,7 +61,7 @@ public void process(Exchange exchange) throws Exception { thread1 = Thread.currentThread().getName(); } - }).to(url).process(new Processor() { + }).toAsync(url).process(new Processor() { public void process(Exchange exchange) throws Exception { thread2 = Thread.currentThread().getName(); } Modified: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java?rev=834165&r1=834164&r2=834165&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java (original) +++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleAsynchronousTest.java Mon Nov 9 18:19:39 2009 @@ -33,7 +33,6 @@ mock.message(0).outBody(String.class).contains("google"); template.sendBody("direct:start", null); - System.out.println("I am not blocked"); assertMockEndpointsSatisfied(); } @@ -45,7 +44,7 @@ public void configure() throws Exception { from("direct:start") // to prevent redirect being thrown as an exception - .to("jetty://http://www.google.com?throwExceptionOnFailure=false&synchronous=false&concurrentConsumers=5") + .toAsync("jetty://http://www.google.com?throwExceptionOnFailure=false") .to("mock:result"); } }; Modified: camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java?rev=834165&r1=834164&r2=834165&view=diff ============================================================================== --- camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java (original) +++ camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/jettyproducer/JettyHttpProducerGoogleTest.java Mon Nov 9 18:19:39 2009 @@ -47,7 +47,7 @@ @Override public void configure() throws Exception { // to prevent redirect being thrown as an exception - from("direct:start").to("jetty://http://www.google.com?throwExceptionOnFailure=false&synchronous=true"); + from("direct:start").to("jetty://http://www.google.com?throwExceptionOnFailure=false"); } }; }