Repository: camel
Updated Branches:
  refs/heads/master 6e1c34413 -> ab0aeca64


CAMEL-10286
Allow async bean method in bean language with J8 CompletableFuture


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/41634f2e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/41634f2e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/41634f2e

Branch: refs/heads/master
Commit: 41634f2ee5e1736398a1e4bc1678699483901b7f
Parents: 6e1c344
Author: Vitalii Tymchyshyn <v...@tym.im>
Authored: Sun Sep 4 15:02:47 2016 -0400
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon Sep 5 09:28:24 2016 +0200

----------------------------------------------------------------------
 .../apache/camel/component/bean/MethodInfo.java |  43 ++++--
 .../component/bean/BeanInvokeAsyncTest.java     | 135 +++++++++++++++++++
 2 files changed, 167 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/41634f2e/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java 
b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
index 368323a..d4a0c57 100644
--- a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
+++ b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AsyncCallback;
@@ -298,19 +299,25 @@ public class MethodInfo {
                     return routingSlip.doRoutingSlip(exchange, result, 
callback);
                 }
 
+                //If it's Java 8 async result
+                if 
(CompletionStage.class.isAssignableFrom(getMethod().getReturnType())) {
+                    CompletionStage<?> completionStage = (CompletionStage<?>) 
result;
+
+                    completionStage
+                            .whenComplete((resultObject, e) -> {
+                                if (e != null) {
+                                    exchange.setException(e);
+                                } else if (resultObject != null){
+                                    fillResult(exchange, resultObject);
+                                }
+                                callback.done(false);
+                            });
+                    return false;
+                }
+
                 // if the method returns something then set the value returned 
on the Exchange
                 if (!getMethod().getReturnType().equals(Void.TYPE) && result 
!= Void.TYPE) {
-                    if (exchange.getPattern().isOutCapable()) {
-                        // force out creating if not already created (as its 
lazy)
-                        LOG.debug("Setting bean invocation result on the OUT 
message: {}", result);
-                        exchange.getOut().setBody(result);
-                        // propagate headers
-                        
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
-                    } else {
-                        // if not out then set it on the in
-                        LOG.debug("Setting bean invocation result on the IN 
message: {}", result);
-                        exchange.getIn().setBody(result);
-                    }
+                    fillResult(exchange, result);
                 }
 
                 // we did not use any of the eips, but just invoked the bean
@@ -329,6 +336,20 @@ public class MethodInfo {
         };
     }
 
+    private void fillResult(Exchange exchange, Object result) {
+        if (exchange.getPattern().isOutCapable()) {
+            // force out creating if not already created (as its lazy)
+            LOG.debug("Setting bean invocation result on the OUT message: {}", 
result);
+            exchange.getOut().setBody(result);
+            // propagate headers
+            
exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders());
+        } else {
+            // if not out then set it on the in
+            LOG.debug("Setting bean invocation result on the IN message: {}", 
result);
+            exchange.getIn().setBody(result);
+        }
+    }
+
     public Class<?> getType() {
         return type;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/41634f2e/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvokeAsyncTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvokeAsyncTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvokeAsyncTest.java
new file mode 100644
index 0000000..a8ecf48
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvokeAsyncTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Assert;
+
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * @version 
+ */
+public class BeanInvokeAsyncTest extends ContextTestSupport {
+    
+    private volatile CompletableFuture<Object> callFuture;
+    private volatile String receivedBody;
+    private volatile CountDownLatch methodInvoked;
+    private Future<Object> sendFuture;
+
+    public void testDoSomething() throws Exception {
+
+        runTestSendBody("Hello World", "Hello World", this::doSomething);
+        runTestSendBody("", "", this::doSomething);
+        runTestSendBody(this::expectNullBody, null, this::doSomething);
+    }
+
+    public void testChangeSomething() throws Exception {
+        runTestSendBody("Bye World", "Hello World", this::changeSomething);
+        runTestSendBody("Bye All", null, this::changeSomething);
+        runTestSendBody("Bye All", "", this::changeSomething);
+
+    }
+
+    public void testDoNothing() throws Exception {
+        runTestSendBody("Hello World", "Hello World", this::doNothing);
+        runTestSendBody("", "", this::doNothing);
+        runTestSendBody(this::expectNullBody, null, this::doNothing);
+    }
+
+    public void testThrowSomething() throws Exception {
+        try {
+            runTestSendBody(m -> m.expectedMessageCount(0), "SomeProblem", 
this::throwSomething);
+            fail("Exception expected");
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof CamelExecutionException);
+            Assert.assertTrue(e.getCause().getCause() instanceof 
IllegalStateException);
+            Assert.assertEquals("SomeProblem", 
e.getCause().getCause().getMessage());
+        }
+    }
+
+    private void runTestSendBody(String expectedBody, String sentBody, 
Function<String, String> processor)
+            throws InterruptedException, 
java.util.concurrent.ExecutionException {
+        runTestSendBody(m -> m.expectedBodiesReceived(expectedBody), sentBody, 
processor);
+    }
+
+    private void runTestSendBody(Consumer<MockEndpoint> mockPreparer, String 
sentBody,
+            Function<String, String> processor)
+            throws InterruptedException, 
java.util.concurrent.ExecutionException {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.reset();
+        mockPreparer.accept(mock);
+        callFuture = new CompletableFuture<>();
+        methodInvoked = new CountDownLatch(1);
+        sendFuture = template.asyncSendBody("direct:entry", sentBody);
+        Assert.assertTrue(methodInvoked.await(5, TimeUnit.SECONDS));
+        Assert.assertEquals(0, mock.getReceivedCounter());
+        Assert.assertFalse(sendFuture.isDone());
+        try {
+            callFuture.complete(processor.apply(receivedBody));
+        } catch (Exception e) {
+            callFuture.completeExceptionally(e);
+        }
+        sendFuture.get();
+        assertMockEndpointsSatisfied();
+    }
+
+    private void expectNullBody(MockEndpoint mock) {
+        mock.expectedMessageCount(1);
+        mock.message(0).body().isNull();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:entry").bean(BeanInvokeAsyncTest.this, 
"asyncMethod").to("mock:result");
+            }
+        };
+    }
+
+    public CompletableFuture<?> asyncMethod(String body) {
+        this.receivedBody = body;
+        methodInvoked.countDown();
+        return callFuture;
+    }
+
+    public String doSomething(String s) {
+        return s;
+    }
+
+    public String changeSomething(String s) {
+        if ("Hello World".equals(s)) {
+            return "Bye World";
+        }
+        return "Bye All";
+    }
+
+    public String doNothing(String s) {
+        return null;
+    }
+
+    public String throwSomething(String s) {
+        throw new IllegalStateException(s);
+    }
+}
\ No newline at end of file

Reply via email to