Repository: camel Updated Branches: refs/heads/master 5774dc452 -> 2d0de2bbf
CAMEL-10172: Make BeanProducer fully async Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d7cb730d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d7cb730d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d7cb730d Branch: refs/heads/master Commit: d7cb730ddc9d082f2c5006f68ca5aaa5ce355020 Parents: 5774dc4 Author: Vitalii Tymchyshyn <v...@tym.im> Authored: Thu Jul 21 23:21:04 2016 -0400 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Jul 24 13:48:16 2016 +0200 ---------------------------------------------------------------------- .../camel/component/bean/BeanProducer.java | 8 +- ...cEndpointRoutingSlipBeanNonBlockingTest.java | 122 +++++++++++++++++++ 2 files changed, 123 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d7cb730d/camel-core/src/main/java/org/apache/camel/component/bean/BeanProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/BeanProducer.java b/camel-core/src/main/java/org/apache/camel/component/bean/BeanProducer.java index 1f2704e..36a6ef3 100644 --- a/camel-core/src/main/java/org/apache/camel/component/bean/BeanProducer.java +++ b/camel-core/src/main/java/org/apache/camel/component/bean/BeanProducer.java @@ -38,13 +38,7 @@ public class BeanProducer extends DefaultAsyncProducer { @Override public boolean process(Exchange exchange, AsyncCallback callback) { - try { - processor.process(exchange); - } catch (Exception e) { - exchange.setException(e); - } - callback.done(true); - return true; + return processor.process(exchange, callback); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/d7cb730d/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoutingSlipBeanNonBlockingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoutingSlipBeanNonBlockingTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoutingSlipBeanNonBlockingTest.java new file mode 100644 index 0000000..7b8eab3 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRoutingSlipBeanNonBlockingTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.async; + +import org.apache.camel.*; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.processor.SendProcessor; +import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.ServiceHelper; +import org.junit.Assert; + +import java.util.concurrent.*; + +/** + * @version + */ +public class AsyncEndpointRoutingSlipBeanNonBlockingTest extends ContextTestSupport { + private AsyncCallback innerCallback; + private Exchange innerExchange; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + jndi.bind("myBean", new MyRoutingSlipBean()); + return jndi; + } + + public void testAsyncEndpointDontBlock() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel"); + Endpoint startEndpoint = context.getEndpoint("direct:start"); + SendProcessor asyncSender = new SendProcessor(startEndpoint); + ServiceHelper.startService(asyncSender); + + ExecutorService executorService = context.getExecutorServiceManager().newSingleThreadExecutor(this, "test"); + try { + Future<Boolean> asyncFuture = executorService.submit(new ExchangeSubmitter(startEndpoint, asyncSender)); + Assert.assertFalse(asyncFuture.get(5, TimeUnit.SECONDS)); + innerExchange.getOut().setBody("Bye Camel"); + innerCallback.done(false); + + assertMockEndpointsSatisfied(); + } finally { + executorService.shutdown(); + ServiceHelper.stopAndShutdownService(asyncSender); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.addComponent("async", new MyAsyncComponent()); + + from("direct:start") + .to("bean:myBean"); + from("direct:asyncRoute") + .process(new MyAsyncProcessor()); + } + }; + } + + public static class MyRoutingSlipBean { + + @RoutingSlip + public String doSomething() { + return "direct:asyncRoute,mock:result"; + } + } + + private static class ExchangeSubmitter implements Callable<Boolean> { + private final Endpoint startEndpoint; + private final SendProcessor asyncSender; + + public ExchangeSubmitter(Endpoint startEndpoint, SendProcessor asyncSender) { + this.startEndpoint = startEndpoint; + this.asyncSender = asyncSender; + } + + @Override + public Boolean call() throws Exception { + Exchange exchange = startEndpoint.createExchange(ExchangePattern.InOut); + exchange.getIn().setBody("Hello Camel"); + return asyncSender.process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + Assert.assertFalse(doneSync); + } + }); + } + } + + private class MyAsyncProcessor implements AsyncProcessor { + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + innerCallback = callback; + innerExchange = exchange; + + return false; + } + + @Override + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + } +} \ No newline at end of file