Author: ningjiang Date: Mon Jul 27 07:06:19 2009 New Revision: 798045 URL: http://svn.apache.org/viewvc?rev=798045&view=rev Log: Added the async methods which use Endpoint as parameter
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=798045&r1=798044&r2=798045&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Mon Jul 27 07:06:19 2009 @@ -780,6 +780,103 @@ <T> Future<T> asyncRequestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type); /** + * Sends an asynchronous exchange to the given endpoint. + * + * @param endpoint the endpoint to send the exchange to + * @param exchange the exchange to send + * @return a handle to be used to get the response in the future + */ + Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange); + + /** + * Sends an asynchronous exchange to the given endpoint. + * + * @param endpoint the endpoint to send the exchange to + * @param processor the transformer used to populate the new exchange + * @return a handle to be used to get the response in the future + */ + Future<Exchange> asyncSend(Endpoint endpoint, Processor processor); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {...@link ExchangePattern#InOnly} message exchange pattern. + * + * @param endpoint the endpoint to send the exchange to + * @param body the body to send + * @return a handle to be used to get the response in the future + */ + Future<Object> asyncSendBody(Endpoint endpoint, Object body); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {...@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpoint the endpoint to send the exchange to + * @param body the body to send + * @return a handle to be used to get the response in the future + */ + Future<Object> asyncRequestBody(Endpoint endpoint, Object body); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {...@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpoint the endpoint to send the exchange to + * @param body the body to send + * @param header the header name + * @param headerValue the header value + * @return a handle to be used to get the response in the future + */ + Future<Object> asyncRequestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {...@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpoint the endpoint to send the exchange to + * @param body the body to send + * @param headers headers + * @return a handle to be used to get the response in the future + */ + Future<Object> asyncRequestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {...@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpoint the endpoint to send the exchange to + * @param body the body to send + * @param type the expected response type + * @return a handle to be used to get the response in the future + */ + <T> Future<T> asyncRequestBody(Endpoint endpoint, Object body, Class<T> type); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {...@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpoint the endpoint to send the exchange to + * @param body the body to send + * @param header the header name + * @param headerValue the header value + * @param type the expected response type + * @return a handle to be used to get the response in the future + */ + <T> Future<T> asyncRequestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {...@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpoint the endpoint to send the exchange to + * @param body the body to send + * @param headers headers + * @param type the expected response type + * @return a handle to be used to get the response in the future + */ + <T> Future<T> asyncRequestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type); + + /** * Gets the response body from the future handle, will wait until the response is ready. * <p/><b>Notice:</b> that if the processing of the exchange failed with an Exception * it is thrown from this method as a {...@link org.apache.camel.CamelExecutionException} with Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=798045&r1=798044&r2=798045&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Mon Jul 27 07:06:19 2009 @@ -435,103 +435,145 @@ } public Future<Exchange> asyncSend(final String uri, final Exchange exchange) { - Callable<Exchange> task = new Callable<Exchange>() { - public Exchange call() throws Exception { - return send(uri, exchange); - } - }; - - return executor.submit(task); + return asyncSend(resolveMandatoryEndpoint(uri), exchange); } public Future<Exchange> asyncSend(final String uri, final Processor processor) { - Callable<Exchange> task = new Callable<Exchange>() { - public Exchange call() throws Exception { - return send(uri, processor); + return asyncSend(resolveMandatoryEndpoint(uri), processor); + } + + public Future<Object> asyncSendBody(final String uri, final Object body) { + return asyncSendBody(resolveMandatoryEndpoint(uri), body); + } + + public Future<Object> asyncRequestBody(final String uri, final Object body) { + return asyncRequestBody(resolveMandatoryEndpoint(uri), body); + } + + public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) { + return asyncRequestBody(resolveMandatoryEndpoint(uri), body, type); + } + + public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) { + return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue); + } + + public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) { + return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue, type); + } + + public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) { + return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); + } + + public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) { + return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers, type); + } + + public <T> T extractFutureBody(Future future, Class<T> type) { + return ExchangeHelper.extractFutureBody(context, future, type); + } + + public <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { + return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type); + } + + public Future<Object> asyncRequestBody(final Endpoint endpoint, final Object body) { + Callable<Object> task = new Callable<Object>() { + public Object call() throws Exception { + return requestBody(endpoint, body); } }; return executor.submit(task); } - public Future<Object> asyncSendBody(final String uri, final Object body) { - Callable<Object> task = new Callable<Object>() { - public Object call() throws Exception { - sendBody(uri, body); - // its InOnly, so no body to return - return null; + public <T> Future<T> asyncRequestBody(final Endpoint endpoint, final Object body, final Class<T> type) { + Callable<T> task = new Callable<T>() { + public T call() throws Exception { + return requestBody(endpoint, body, type); } }; return executor.submit(task); } - public Future<Object> asyncRequestBody(final String uri, final Object body) { + public Future<Object> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header, + final Object headerValue) { Callable<Object> task = new Callable<Object>() { public Object call() throws Exception { - return requestBody(uri, body); + return requestBodyAndHeader(endpoint, body, header, headerValue); } }; return executor.submit(task); } - public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) { + public <T> Future<T> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header, + final Object headerValue, final Class<T> type) { Callable<T> task = new Callable<T>() { public T call() throws Exception { - return requestBody(uri, body, type); + return requestBodyAndHeader(endpoint, body, header, headerValue, type); } }; return executor.submit(task); } - public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) { + + public Future<Object> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body, + final Map<String, Object> headers) { Callable<Object> task = new Callable<Object>() { public Object call() throws Exception { - return requestBodyAndHeader(endpointUri, body, header, headerValue); + return requestBodyAndHeaders(endpoint, body, headers); } }; return executor.submit(task); } - public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) { + public <T> Future<T> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body, + final Map<String, Object> headers, final Class<T> type) { Callable<T> task = new Callable<T>() { public T call() throws Exception { - return requestBodyAndHeader(endpointUri, body, header, headerValue, type); + return requestBodyAndHeaders(endpoint, body, headers, type); } }; return executor.submit(task); } - public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) { - Callable<Object> task = new Callable<Object>() { - public Object call() throws Exception { - return requestBodyAndHeaders(endpointUri, body, headers); + public Future<Exchange> asyncSend(final Endpoint endpoint, final Exchange exchange) { + Callable<Exchange> task = new Callable<Exchange>() { + public Exchange call() throws Exception { + return send(endpoint, exchange); } }; return executor.submit(task); } - public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) { - Callable<T> task = new Callable<T>() { - public T call() throws Exception { - return requestBodyAndHeaders(endpointUri, body, headers, type); + public Future<Exchange> asyncSend(final Endpoint endpoint, final Processor processor) { + Callable<Exchange> task = new Callable<Exchange>() { + public Exchange call() throws Exception { + return send(endpoint, processor); } }; return executor.submit(task); } - public <T> T extractFutureBody(Future future, Class<T> type) { - return ExchangeHelper.extractFutureBody(context, future, type); - } + public Future<Object> asyncSendBody(final Endpoint endpoint, final Object body) { + Callable<Object> task = new Callable<Object>() { + public Object call() throws Exception { + sendBody(endpoint, body); + // its InOnly, so no body to return + return null; + } + }; - public <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { - return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type); + return executor.submit(task); } + } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java?rev=798045&r1=798044&r2=798045&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java Mon Jul 27 07:06:19 2009 @@ -129,7 +129,7 @@ .transform(body().append(" World")) // now turn the route into async from this point forward // the caller will have a Future<Exchange> returned as response in OUT - // to be used to grap the async response when he fell like it + // to be used to grape the async response when he fell like it .threads() // from this point forward this is the async route doing its work // so we do a bit of delay to simulate heavy work that takes time