Repository: camel Updated Branches: refs/heads/master 6e1c34413 -> ab0aeca64
CAMEL-10286 Allow async bean method in bean language with J8 CompletableFuture Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/41634f2e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/41634f2e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/41634f2e Branch: refs/heads/master Commit: 41634f2ee5e1736398a1e4bc1678699483901b7f Parents: 6e1c344 Author: Vitalii Tymchyshyn <v...@tym.im> Authored: Sun Sep 4 15:02:47 2016 -0400 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Sep 5 09:28:24 2016 +0200 ---------------------------------------------------------------------- .../apache/camel/component/bean/MethodInfo.java | 43 ++++-- .../component/bean/BeanInvokeAsyncTest.java | 135 +++++++++++++++++++ 2 files changed, 167 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/41634f2e/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java index 368323a..d4a0c57 100644 --- a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java +++ b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; import org.apache.camel.AsyncCallback; @@ -298,19 +299,25 @@ public class MethodInfo { return routingSlip.doRoutingSlip(exchange, result, callback); } + //If it's Java 8 async result + if (CompletionStage.class.isAssignableFrom(getMethod().getReturnType())) { + CompletionStage<?> completionStage = (CompletionStage<?>) result; + + completionStage + .whenComplete((resultObject, e) -> { + if (e != null) { + exchange.setException(e); + } else if (resultObject != null){ + fillResult(exchange, resultObject); + } + callback.done(false); + }); + return false; + } + // if the method returns something then set the value returned on the Exchange if (!getMethod().getReturnType().equals(Void.TYPE) && result != Void.TYPE) { - if (exchange.getPattern().isOutCapable()) { - // force out creating if not already created (as its lazy) - LOG.debug("Setting bean invocation result on the OUT message: {}", result); - exchange.getOut().setBody(result); - // propagate headers - exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); - } else { - // if not out then set it on the in - LOG.debug("Setting bean invocation result on the IN message: {}", result); - exchange.getIn().setBody(result); - } + fillResult(exchange, result); } // we did not use any of the eips, but just invoked the bean @@ -329,6 +336,20 @@ public class MethodInfo { }; } + private void fillResult(Exchange exchange, Object result) { + if (exchange.getPattern().isOutCapable()) { + // force out creating if not already created (as its lazy) + LOG.debug("Setting bean invocation result on the OUT message: {}", result); + exchange.getOut().setBody(result); + // propagate headers + exchange.getOut().getHeaders().putAll(exchange.getIn().getHeaders()); + } else { + // if not out then set it on the in + LOG.debug("Setting bean invocation result on the IN message: {}", result); + exchange.getIn().setBody(result); + } + } + public Class<?> getType() { return type; } http://git-wip-us.apache.org/repos/asf/camel/blob/41634f2e/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvokeAsyncTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvokeAsyncTest.java b/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvokeAsyncTest.java new file mode 100644 index 0000000..a8ecf48 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvokeAsyncTest.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.bean; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Assert; + +import java.util.concurrent.*; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * @version + */ +public class BeanInvokeAsyncTest extends ContextTestSupport { + + private volatile CompletableFuture<Object> callFuture; + private volatile String receivedBody; + private volatile CountDownLatch methodInvoked; + private Future<Object> sendFuture; + + public void testDoSomething() throws Exception { + + runTestSendBody("Hello World", "Hello World", this::doSomething); + runTestSendBody("", "", this::doSomething); + runTestSendBody(this::expectNullBody, null, this::doSomething); + } + + public void testChangeSomething() throws Exception { + runTestSendBody("Bye World", "Hello World", this::changeSomething); + runTestSendBody("Bye All", null, this::changeSomething); + runTestSendBody("Bye All", "", this::changeSomething); + + } + + public void testDoNothing() throws Exception { + runTestSendBody("Hello World", "Hello World", this::doNothing); + runTestSendBody("", "", this::doNothing); + runTestSendBody(this::expectNullBody, null, this::doNothing); + } + + public void testThrowSomething() throws Exception { + try { + runTestSendBody(m -> m.expectedMessageCount(0), "SomeProblem", this::throwSomething); + fail("Exception expected"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof CamelExecutionException); + Assert.assertTrue(e.getCause().getCause() instanceof IllegalStateException); + Assert.assertEquals("SomeProblem", e.getCause().getCause().getMessage()); + } + } + + private void runTestSendBody(String expectedBody, String sentBody, Function<String, String> processor) + throws InterruptedException, java.util.concurrent.ExecutionException { + runTestSendBody(m -> m.expectedBodiesReceived(expectedBody), sentBody, processor); + } + + private void runTestSendBody(Consumer<MockEndpoint> mockPreparer, String sentBody, + Function<String, String> processor) + throws InterruptedException, java.util.concurrent.ExecutionException { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.reset(); + mockPreparer.accept(mock); + callFuture = new CompletableFuture<>(); + methodInvoked = new CountDownLatch(1); + sendFuture = template.asyncSendBody("direct:entry", sentBody); + Assert.assertTrue(methodInvoked.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(0, mock.getReceivedCounter()); + Assert.assertFalse(sendFuture.isDone()); + try { + callFuture.complete(processor.apply(receivedBody)); + } catch (Exception e) { + callFuture.completeExceptionally(e); + } + sendFuture.get(); + assertMockEndpointsSatisfied(); + } + + private void expectNullBody(MockEndpoint mock) { + mock.expectedMessageCount(1); + mock.message(0).body().isNull(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:entry").bean(BeanInvokeAsyncTest.this, "asyncMethod").to("mock:result"); + } + }; + } + + public CompletableFuture<?> asyncMethod(String body) { + this.receivedBody = body; + methodInvoked.countDown(); + return callFuture; + } + + public String doSomething(String s) { + return s; + } + + public String changeSomething(String s) { + if ("Hello World".equals(s)) { + return "Bye World"; + } + return "Bye All"; + } + + public String doNothing(String s) { + return null; + } + + public String throwSomething(String s) { + throw new IllegalStateException(s); + } +} \ No newline at end of file