Author: davsclaus Date: Thu Mar 31 09:54:38 2011 New Revision: 1087229 URL: http://svn.apache.org/viewvc?rev=1087229&view=rev Log: CAMEL-3790: Camel proxy now support asynchronous clients using Future handles.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java?rev=1087229&r1=1087228&r2=1087229&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java Thu Mar 31 09:54:38 2011 @@ -18,10 +18,19 @@ package org.apache.camel.component.bean; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelExchangeException; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.InvalidPayloadException; import org.apache.camel.Producer; import org.apache.camel.RuntimeCamelException; import org.apache.camel.impl.DefaultExchange; @@ -38,6 +47,9 @@ import org.slf4j.LoggerFactory; public class CamelInvocationHandler implements InvocationHandler { private static final transient Logger LOG = LoggerFactory.getLogger(CamelInvocationHandler.class); + // use a static thread pool to not create a new thread pool for each invocation + private static ExecutorService executorService; + private final Endpoint endpoint; private final Producer producer; private final MethodInfoCache methodInfoCache; @@ -48,26 +60,61 @@ public class CamelInvocationHandler impl this.methodInfoCache = methodInfoCache; } - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { BeanInvocation invocation = new BeanInvocation(method, args); - ExchangePattern pattern = ExchangePattern.InOut; MethodInfo methodInfo = methodInfoCache.getMethodInfo(method); - if (methodInfo != null) { - pattern = methodInfo.getPattern(); - } - Exchange exchange = new DefaultExchange(endpoint, pattern); + + final ExchangePattern pattern = methodInfo != null ? methodInfo.getPattern() : ExchangePattern.InOut; + final Exchange exchange = new DefaultExchange(endpoint, pattern); exchange.getIn().setBody(invocation); - // process the exchange - LOG.trace("Proxied method call {} invoking producer: {}", method.getName(), producer); - producer.process(exchange); + // is the return type a future + final boolean isFuture = method.getReturnType() == Future.class; + + // create task to execute the proxy and gather the reply + FutureTask task = new FutureTask<Object>(new Callable<Object>() { + public Object call() throws Exception { + // process the exchange + LOG.trace("Proxied method call {} invoking producer: {}", method.getName(), producer); + producer.process(exchange); + + Object answer = afterInvoke(method, exchange, pattern, isFuture); + LOG.trace("Proxied method call {} returning: {}", method.getName(), answer); + return answer; + } + }); + + if (isFuture) { + // submit task and return future + if (LOG.isTraceEnabled()) { + LOG.trace("Submitting task for exchange id {}", exchange.getExchangeId()); + } + getExecutorService(exchange.getContext()).submit(task); + return task; + } else { + // execute task now + try { + task.run(); + return task.get(); + } catch (ExecutionException e) { + // we don't want the wrapped exception from JDK + throw e.getCause(); + } + } + } + protected Object afterInvoke(Method method, Exchange exchange, ExchangePattern pattern, boolean isFuture) throws Exception { // check if we had an exception Throwable cause = exchange.getException(); if (cause != null) { Throwable found = findSuitableException(cause, method); if (found != null) { - throw found; + if (found instanceof Exception) { + throw (Exception) found; + } else { + // wrap as exception + throw new CamelExchangeException("Error processing exchange", exchange, cause); + } } // special for runtime camel exceptions as they can be nested if (cause instanceof RuntimeCamelException) { @@ -78,28 +125,73 @@ public class CamelInvocationHandler impl throw (RuntimeCamelException) cause; } // okay just throw the exception as is - throw cause; + if (cause instanceof Exception) { + throw (Exception) cause; + } else { + // wrap as exception + throw new CamelExchangeException("Error processing exchange", exchange, cause); + } } - // do not return a reply if the method is VOID or the MEP is not OUT capable + // do not return a reply if the method is VOID Class<?> to = method.getReturnType(); - if (to == Void.TYPE || !pattern.isOutCapable()) { - return null; - } - - // only convert if there is a body - if (!exchange.hasOut() || exchange.getOut().getBody() == null) { - // there is no body so return null + if (to == Void.TYPE) { return null; } // use type converter so we can convert output in the desired type defined by the method // and let it be mandatory so we know wont return null if we cant convert it to the defined type - Object answer = exchange.getOut().getMandatoryBody(to); - LOG.trace("Proxied method call {} returning: {}", method.getName(), answer); + Object answer; + if (!isFuture) { + answer = getBody(exchange, to); + } else { + // if its a Future then we need to extract the class from the future type so we know + // which class to return the result as + Class<?> returnTo = getGenericType(exchange.getContext(), method.getGenericReturnType()); + answer = getBody(exchange, returnTo); + } + return answer; } + private static Object getBody(Exchange exchange, Class<?> type) throws InvalidPayloadException { + // get the body from the Exchange from either OUT or IN + if (exchange.hasOut()) { + if (exchange.getOut().getBody() != null) { + return exchange.getOut().getMandatoryBody(type); + } else { + return null; + } + } else { + if (exchange.getIn().getBody() != null) { + return exchange.getIn().getMandatoryBody(type); + } else { + return null; + } + } + } + + protected static Class getGenericType(CamelContext context, Type type) throws ClassNotFoundException { + if (type == null) { + // fallback and use object + return Object.class; + } + + // unfortunately java dont provide a nice api for getting the generic type of the return type + // due type erasure, so we have to gather it based on a String representation + String name = ObjectHelper.between(type.toString(), "<", ">"); + if (name != null) { + if (name.contains("<")) { + // we only need the outer type + name = ObjectHelper.before(name, "<"); + } + return context.getClassResolver().resolveMandatoryClass(name); + } else { + // fallback and use object + return Object.class; + } + } + /** * Tries to find the best suited exception to throw. * <p/> @@ -126,5 +218,18 @@ public class CamelInvocationHandler impl return null; } + protected static synchronized ExecutorService getExecutorService(CamelContext context) { + // CamelContext will shutdown thread pool when it shutdown so we can lazy create it on demand + // but in case of hot-deploy or the likes we need to be able to re-create it (its a shared static instance) + if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) { + // try to lookup a pool first based on id/profile + executorService = context.getExecutorServiceStrategy().lookup(CamelInvocationHandler.class, "CamelInvocationHandler", "CamelInvocationHandler"); + if (executorService == null) { + executorService = context.getExecutorServiceStrategy().newDefaultThreadPool(CamelInvocationHandler.class, "CamelInvocationHandler"); + } + } + return executorService; + } + } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java?rev=1087229&r1=1087228&r2=1087229&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java Thu Mar 31 09:54:38 2011 @@ -83,16 +83,6 @@ public class BeanProxyTest extends Conte assertEquals("<order>FAIL</order>", reply); } - // TODO: Does not pass on JDK6 - - public void disabledtestBeanProxyFailureNullBody() throws Exception { - Endpoint endpoint = context.getEndpoint("direct:start"); - OrderService service = ProxyHelper.createProxy(endpoint, OrderService.class); - - String reply = service.submitOrderStringReturnString(null); - assertEquals("<order>FAIL</order>", reply); - } - public void testBeanProxyFailureNotXMLBody() throws Exception { Endpoint endpoint = context.getEndpoint("direct:start"); OrderService service = ProxyHelper.createProxy(endpoint, OrderService.class); Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java?rev=1087229&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java Thu Mar 31 09:54:38 2011 @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.bean; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version + */ +public class ProxyReturnFutureExceptionTest extends ContextTestSupport { + + public void testFutureEchoException() throws Exception { + Echo service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Echo.class); + + Future future = service.asText(4); + log.info("Got future"); + assertFalse("Should not be done", future.isDone()); + log.info("Waiting for future to be done ..."); + try { + assertEquals("Four", future.get(5, TimeUnit.SECONDS)); + fail("Should have thrown exception"); + } catch (ExecutionException e) { + IllegalArgumentException cause = assertIsInstanceOf(IllegalArgumentException.class, e.getCause()); + assertEquals("Forced", cause.getMessage()); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:echo") + .delay(2000) + .throwException(new IllegalArgumentException("Forced")); + } + }; + } + + public static interface Echo { + Future<String> asText(int number); + } + +} Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java?rev=1087229&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java Thu Mar 31 09:54:38 2011 @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.bean; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version + */ +public class ProxyReturnFutureListTest extends ContextTestSupport { + + public void testFutureList() throws Exception { + Users service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Users.class); + + Future future = service.getUsers(true); + log.info("Got future"); + assertFalse("Should not be done", future.isDone()); + log.info("Waiting for future to be done ..."); + + List<String> users = (List<String>) future.get(5, TimeUnit.SECONDS); + assertEquals("Claus", users.get(0)); + assertEquals("Jonathan", users.get(1)); + } + + public void testFutureListCallTwoTimes() throws Exception { + Users service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Users.class); + + Future future = service.getUsers(true); + log.info("Got future"); + assertFalse("Should not be done", future.isDone()); + log.info("Waiting for future to be done ..."); + + List<String> users = (List<String>) future.get(5, TimeUnit.SECONDS); + assertEquals("Claus", users.get(0)); + assertEquals("Jonathan", users.get(1)); + + future = service.getUsers(true); + log.info("Got future"); + assertFalse("Should not be done", future.isDone()); + log.info("Waiting for future to be done ..."); + + users = (List<String>) future.get(5, TimeUnit.SECONDS); + assertEquals("Claus", users.get(0)); + assertEquals("Jonathan", users.get(1)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:echo") + .delay(2000) + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + List<String> users = new ArrayList<String>(); + users.add("Claus"); + users.add("Jonathan"); + exchange.getIn().setBody(users); + } + }); + } + }; + } + + public static interface Users { + Future<List<String>> getUsers(boolean gold); + } + +} Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java?rev=1087229&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java Thu Mar 31 09:54:38 2011 @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.bean; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version + */ +public class ProxyReturnFutureTest extends ContextTestSupport { + + // START SNIPPET: e2 + public void testFutureEcho() throws Exception { + Echo service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Echo.class); + + Future future = service.asText(4); + log.info("Got future"); + + assertFalse("Should not be done", future.isDone()); + log.info("Waiting for future to be done ..."); + + String reply = (String) future.get(5, TimeUnit.SECONDS); + assertEquals("Four", reply); + } + // END SNIPPET: e2 + + public void testFutureEchoCallTwoTimes() throws Exception { + Echo service = ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Echo.class); + + Future future = service.asText(4); + log.info("Got future"); + assertFalse("Should not be done", future.isDone()); + log.info("Waiting for future to be done ..."); + assertEquals("Four", future.get(5, TimeUnit.SECONDS)); + + future = service.asText(5); + log.info("Got future"); + assertFalse("Should not be done", future.isDone()); + log.info("Waiting for future to be done ..."); + assertEquals("Four", future.get(5, TimeUnit.SECONDS)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:echo") + .delay(2000) + .transform().constant("Four"); + } + }; + } + + // START SNIPPET: e1 + public static interface Echo { + + // returning a Future indicate asynchronous invocation + Future<String> asText(int number); + + } + // END SNIPPET: e1 + +}