Author: davsclaus
Date: Thu Mar 31 09:54:38 2011
New Revision: 1087229

URL: http://svn.apache.org/viewvc?rev=1087229&view=rev
Log:
CAMEL-3790: Camel proxy now support asynchronous clients using Future handles.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java?rev=1087229&r1=1087228&r2=1087229&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java
 Thu Mar 31 09:54:38 2011
@@ -18,10 +18,19 @@ package org.apache.camel.component.bean;
 
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.InvalidPayloadException;
 import org.apache.camel.Producer;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultExchange;
@@ -38,6 +47,9 @@ import org.slf4j.LoggerFactory;
 public class CamelInvocationHandler implements InvocationHandler {
     private static final transient Logger LOG = 
LoggerFactory.getLogger(CamelInvocationHandler.class);
 
+    // use a static thread pool to not create a new thread pool for each 
invocation
+    private static ExecutorService executorService;
+
     private final Endpoint endpoint;
     private final Producer producer;
     private final MethodInfoCache methodInfoCache;
@@ -48,26 +60,61 @@ public class CamelInvocationHandler impl
         this.methodInfoCache = methodInfoCache;
     }
 
-    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+    public Object invoke(final Object proxy, final Method method, final 
Object[] args) throws Throwable {
         BeanInvocation invocation = new BeanInvocation(method, args);
-        ExchangePattern pattern = ExchangePattern.InOut;
         MethodInfo methodInfo = methodInfoCache.getMethodInfo(method);
-        if (methodInfo != null) {
-            pattern = methodInfo.getPattern();
-        }
-        Exchange exchange = new DefaultExchange(endpoint, pattern);
+
+        final ExchangePattern pattern = methodInfo != null ? 
methodInfo.getPattern() : ExchangePattern.InOut;
+        final Exchange exchange = new DefaultExchange(endpoint, pattern);
         exchange.getIn().setBody(invocation);
 
-        // process the exchange
-        LOG.trace("Proxied method call {} invoking producer: {}", 
method.getName(), producer);
-        producer.process(exchange);
+        // is the return type a future
+        final boolean isFuture = method.getReturnType() == Future.class;
+
+        // create task to execute the proxy and gather the reply
+        FutureTask task = new FutureTask<Object>(new Callable<Object>() {
+            public Object call() throws Exception {
+                // process the exchange
+                LOG.trace("Proxied method call {} invoking producer: {}", 
method.getName(), producer);
+                producer.process(exchange);
+
+                Object answer = afterInvoke(method, exchange, pattern, 
isFuture);
+                LOG.trace("Proxied method call {} returning: {}", 
method.getName(), answer);
+                return answer;
+            }
+        });
+
+        if (isFuture) {
+            // submit task and return future
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Submitting task for exchange id {}", 
exchange.getExchangeId());
+            }
+            getExecutorService(exchange.getContext()).submit(task);
+            return task;
+        } else {
+            // execute task now
+            try {
+                task.run();
+                return task.get();
+            } catch (ExecutionException e) {
+                // we don't want the wrapped exception from JDK
+                throw e.getCause();
+            }
+        }
+    }
 
+    protected Object afterInvoke(Method method, Exchange exchange, 
ExchangePattern pattern, boolean isFuture) throws Exception {
         // check if we had an exception
         Throwable cause = exchange.getException();
         if (cause != null) {
             Throwable found = findSuitableException(cause, method);
             if (found != null) {
-                throw found;
+                if (found instanceof Exception) {
+                    throw (Exception) found;
+                } else {
+                    // wrap as exception
+                    throw new CamelExchangeException("Error processing 
exchange", exchange, cause);
+                }
             }
             // special for runtime camel exceptions as they can be nested
             if (cause instanceof RuntimeCamelException) {
@@ -78,28 +125,73 @@ public class CamelInvocationHandler impl
                 throw (RuntimeCamelException) cause;
             }
             // okay just throw the exception as is
-            throw cause;
+            if (cause instanceof Exception) {
+                throw (Exception) cause;
+            } else {
+                // wrap as exception
+                throw new CamelExchangeException("Error processing exchange", 
exchange, cause);
+            }
         }
 
-        // do not return a reply if the method is VOID or the MEP is not OUT 
capable
+        // do not return a reply if the method is VOID
         Class<?> to = method.getReturnType();
-        if (to == Void.TYPE || !pattern.isOutCapable()) {
-            return null;
-        }
-
-        // only convert if there is a body
-        if (!exchange.hasOut() || exchange.getOut().getBody() == null) {
-            // there is no body so return null
+        if (to == Void.TYPE) {
             return null;
         }
 
         // use type converter so we can convert output in the desired type 
defined by the method
         // and let it be mandatory so we know wont return null if we cant 
convert it to the defined type
-        Object answer = exchange.getOut().getMandatoryBody(to);
-        LOG.trace("Proxied method call {} returning: {}", method.getName(), 
answer);
+        Object answer;
+        if (!isFuture) {
+            answer = getBody(exchange, to);
+        } else {
+            // if its a Future then we need to extract the class from the 
future type so we know
+            // which class to return the result as
+            Class<?> returnTo = getGenericType(exchange.getContext(), 
method.getGenericReturnType());
+            answer = getBody(exchange, returnTo);
+        }
+
         return answer;
     }
 
+    private static Object getBody(Exchange exchange, Class<?> type) throws 
InvalidPayloadException {
+        // get the body from the Exchange from either OUT or IN
+        if (exchange.hasOut()) {
+            if (exchange.getOut().getBody() != null) {
+                return exchange.getOut().getMandatoryBody(type);
+            } else {
+                return null;
+            }
+        } else {
+            if (exchange.getIn().getBody() != null) {
+                return exchange.getIn().getMandatoryBody(type);
+            } else {
+                return null;
+            }
+        }
+    }
+
+    protected static Class getGenericType(CamelContext context, Type type) 
throws ClassNotFoundException {
+        if (type == null) {
+            // fallback and use object
+            return Object.class;
+        }
+
+        // unfortunately java dont provide a nice api for getting the generic 
type of the return type
+        // due type erasure, so we have to gather it based on a String 
representation
+        String name = ObjectHelper.between(type.toString(), "<", ">");
+        if (name != null) {
+            if (name.contains("<")) {
+                // we only need the outer type
+                name = ObjectHelper.before(name, "<");
+            }
+            return context.getClassResolver().resolveMandatoryClass(name);
+        } else {
+            // fallback and use object
+            return Object.class;
+        }
+    }
+
     /**
      * Tries to find the best suited exception to throw.
      * <p/>
@@ -126,5 +218,18 @@ public class CamelInvocationHandler impl
         return null;
     }
 
+    protected static synchronized ExecutorService 
getExecutorService(CamelContext context) {
+        // CamelContext will shutdown thread pool when it shutdown so we can 
lazy create it on demand
+        // but in case of hot-deploy or the likes we need to be able to 
re-create it (its a shared static instance)
+        if (executorService == null || executorService.isTerminated() || 
executorService.isShutdown()) {
+            // try to lookup a pool first based on id/profile
+            executorService = 
context.getExecutorServiceStrategy().lookup(CamelInvocationHandler.class, 
"CamelInvocationHandler", "CamelInvocationHandler");
+            if (executorService == null) {
+                executorService = 
context.getExecutorServiceStrategy().newDefaultThreadPool(CamelInvocationHandler.class,
 "CamelInvocationHandler");
+            }
+        }
+        return executorService;
+    }
+
 }
 

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java?rev=1087229&r1=1087228&r2=1087229&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanProxyTest.java
 Thu Mar 31 09:54:38 2011
@@ -83,16 +83,6 @@ public class BeanProxyTest extends Conte
         assertEquals("<order>FAIL</order>", reply);
     }
 
-    // TODO: Does not pass on JDK6
-
-    public void disabledtestBeanProxyFailureNullBody() throws Exception {
-        Endpoint endpoint = context.getEndpoint("direct:start");
-        OrderService service = ProxyHelper.createProxy(endpoint, 
OrderService.class);
-
-        String reply = service.submitOrderStringReturnString(null);
-        assertEquals("<order>FAIL</order>", reply);
-    }
-
     public void testBeanProxyFailureNotXMLBody() throws Exception {
         Endpoint endpoint = context.getEndpoint("direct:start");
         OrderService service = ProxyHelper.createProxy(endpoint, 
OrderService.class);

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java?rev=1087229&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureExceptionTest.java
 Thu Mar 31 09:54:38 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class ProxyReturnFutureExceptionTest extends ContextTestSupport {
+
+    public void testFutureEchoException() throws Exception {
+        Echo service = 
ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Echo.class);
+
+        Future future = service.asText(4);
+        log.info("Got future");
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+        try {
+            assertEquals("Four", future.get(5, TimeUnit.SECONDS));
+            fail("Should have thrown exception");
+        } catch (ExecutionException e) {
+            IllegalArgumentException cause = 
assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertEquals("Forced", cause.getMessage());
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:echo")
+                    .delay(2000)
+                    .throwException(new IllegalArgumentException("Forced"));
+            }
+        };
+    }
+
+    public static interface Echo {
+        Future<String> asText(int number);
+    }
+
+}

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java?rev=1087229&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureListTest.java
 Thu Mar 31 09:54:38 2011
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class ProxyReturnFutureListTest extends ContextTestSupport {
+
+    public void testFutureList() throws Exception {
+        Users service = 
ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Users.class);
+
+        Future future = service.getUsers(true);
+        log.info("Got future");
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+
+        List<String> users = (List<String>) future.get(5, TimeUnit.SECONDS);
+        assertEquals("Claus", users.get(0));
+        assertEquals("Jonathan", users.get(1));
+    }
+
+    public void testFutureListCallTwoTimes() throws Exception {
+        Users service = 
ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Users.class);
+
+        Future future = service.getUsers(true);
+        log.info("Got future");
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+
+        List<String> users = (List<String>) future.get(5, TimeUnit.SECONDS);
+        assertEquals("Claus", users.get(0));
+        assertEquals("Jonathan", users.get(1));
+
+        future = service.getUsers(true);
+        log.info("Got future");
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+
+        users = (List<String>) future.get(5, TimeUnit.SECONDS);
+        assertEquals("Claus", users.get(0));
+        assertEquals("Jonathan", users.get(1));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:echo")
+                    .delay(2000)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws 
Exception {
+                            List<String> users = new ArrayList<String>();
+                            users.add("Claus");
+                            users.add("Jonathan");
+                            exchange.getIn().setBody(users);
+                        }
+                    });
+            }
+        };
+    }
+
+    public static interface Users {
+        Future<List<String>> getUsers(boolean gold);
+    }
+
+}

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java?rev=1087229&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/ProxyReturnFutureTest.java
 Thu Mar 31 09:54:38 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class ProxyReturnFutureTest extends ContextTestSupport {
+
+    // START SNIPPET: e2
+    public void testFutureEcho() throws Exception {
+        Echo service = 
ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Echo.class);
+
+        Future future = service.asText(4);
+        log.info("Got future");
+
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+
+        String reply = (String) future.get(5, TimeUnit.SECONDS);
+        assertEquals("Four", reply);
+    }
+    // END SNIPPET: e2
+
+    public void testFutureEchoCallTwoTimes() throws Exception {
+        Echo service = 
ProxyHelper.createProxy(context.getEndpoint("direct:echo"), Echo.class);
+
+        Future future = service.asText(4);
+        log.info("Got future");
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+        assertEquals("Four", future.get(5, TimeUnit.SECONDS));
+
+        future = service.asText(5);
+        log.info("Got future");
+        assertFalse("Should not be done", future.isDone());
+        log.info("Waiting for future to be done ...");
+        assertEquals("Four", future.get(5, TimeUnit.SECONDS));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:echo")
+                    .delay(2000)
+                    .transform().constant("Four");
+            }
+        };
+    }
+
+    // START SNIPPET: e1
+    public static interface Echo {
+
+        // returning a Future indicate asynchronous invocation
+        Future<String> asText(int number);
+
+    }
+    // END SNIPPET: e1
+
+}


Reply via email to