Repository: camel Updated Branches: refs/heads/master b86e52d0a -> b86ced1c2
CAMEL-5690 - Using bean component with beans that implement Service from Camel should have the lifecycle callbacks invoked Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b86ced1c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b86ced1c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b86ced1c Branch: refs/heads/master Commit: b86ced1c23142ae95a5ede45383e2a13184f0c69 Parents: b86e52d Author: lburgazzoli <lburgazz...@gmail.com> Authored: Tue Apr 5 15:28:58 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed Apr 6 07:31:23 2016 +0200 ---------------------------------------------------------------------- .../camel/component/bean/BeanProcessor.java | 25 ++- .../camel/component/bean/BeanProducer.java | 34 ++++ .../camel/component/bean/BeanLifecycleTest.java | 154 +++++++++++++++++++ 3 files changed, 212 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b86ced1c/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java b/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java index 6565c2e..e7fe819 100644 --- a/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/component/bean/BeanProcessor.java @@ -21,6 +21,7 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Message; +import org.apache.camel.NoSuchBeanException; import org.apache.camel.Processor; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -181,6 +182,10 @@ public class BeanProcessor extends ServiceSupport implements AsyncProcessor { return processor; } + protected BeanHolder getBeanHolder() { + return this.beanHolder; + } + public Object getBean() { return beanHolder.getBean(); } @@ -228,11 +233,29 @@ public class BeanProcessor extends ServiceSupport implements AsyncProcessor { if (beanHolder.supportProcessor() && allowProcessor(method, beanHolder.getBeanInfo())) { processor = beanHolder.getProcessor(); ServiceHelper.startService(processor); + } else if (beanHolder instanceof ConstantBeanHolder) { + try { + // Start the bean if it implements Service interface and if cached + // so meant to be reused + ServiceHelper.startService(beanHolder.getBean()); + } catch (NoSuchBeanException e) { + // ignore + } } } protected void doStop() throws Exception { - ServiceHelper.stopService(processor); + if (processor != null) { + ServiceHelper.stopService(processor); + } else if (beanHolder instanceof ConstantBeanHolder) { + try { + // Stop the bean if it implements Service interface and if cached + // so meant to be reused + ServiceHelper.stopService(beanHolder.getBean()); + } catch (NoSuchBeanException e) { + // ignore + } + } } private boolean allowProcessor(String explicitMethodName, BeanInfo info) { http://git-wip-us.apache.org/repos/asf/camel/blob/b86ced1c/camel-core/src/main/java/org/apache/camel/component/bean/BeanProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/BeanProducer.java b/camel-core/src/main/java/org/apache/camel/component/bean/BeanProducer.java index 065204f..1f2704e 100644 --- a/camel-core/src/main/java/org/apache/camel/component/bean/BeanProducer.java +++ b/camel-core/src/main/java/org/apache/camel/component/bean/BeanProducer.java @@ -18,7 +18,9 @@ package org.apache.camel.component.bean; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.NoSuchBeanException; import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.util.ServiceHelper; /** * Bean {@link org.apache.camel.Producer} @@ -26,10 +28,12 @@ import org.apache.camel.impl.DefaultAsyncProducer; public class BeanProducer extends DefaultAsyncProducer { private final BeanProcessor processor; + private boolean beanStarted; public BeanProducer(BeanEndpoint endpoint, BeanProcessor processor) { super(endpoint); this.processor = processor; + this.beanStarted = false; } @Override @@ -42,4 +46,34 @@ public class BeanProducer extends DefaultAsyncProducer { callback.done(true); return true; } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + if (processor.getBeanHolder() instanceof ConstantBeanHolder) { + try { + // Start the bean if it implements Service interface and if cached + // so meant to be reused + ServiceHelper.startService(processor.getBean()); + beanStarted = true; + } catch (NoSuchBeanException e) { + } + } + } + + @Override + protected void doStop() throws Exception { + if (beanStarted) { + try { + // Stop the bean if it implements Service interface and if cached + // so meant to be reused + ServiceHelper.stopService(processor.getBean()); + beanStarted = false; + } catch (NoSuchBeanException e) { + } + } + + super.doStop(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/b86ced1c/camel-core/src/test/java/org/apache/camel/component/bean/BeanLifecycleTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/bean/BeanLifecycleTest.java b/camel-core/src/test/java/org/apache/camel/component/bean/BeanLifecycleTest.java new file mode 100644 index 0000000..254f008 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/bean/BeanLifecycleTest.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.bean; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Service; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.JndiRegistry; + +/** + * @version + */ +public class BeanLifecycleTest extends ContextTestSupport { + + private MyBean statefulInstance; + private MyBean statefulInstanceInRegistry; + private MyBean statefulInstanceInRegistryNoCache; + + @Override + protected void setUp() throws Exception { + statefulInstance = new MyBean(); + statefulInstanceInRegistry = new MyBean(); + statefulInstanceInRegistryNoCache = new MyBean(); + + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + + assertEquals("stopped", statefulInstance.getStatus()); + assertEquals("stopped", statefulInstanceInRegistry.getStatus()); + assertNull(statefulInstanceInRegistryNoCache.getStatus()); + assertEquals(2, MyStatefulBean.INSTANCES.get()); + } + + public void testBeanLifecycle() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + + assertEquals("started", statefulInstance.getStatus()); + assertEquals("started", statefulInstanceInRegistry.getStatus()); + assertNull(statefulInstanceInRegistryNoCache.getStatus()); + assertEquals(2, MyStatefulBean.INSTANCES.get()); + + template.sendBody("direct:foo", null); + + mock.assertIsSatisfied(); + } + + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry jndi = super.createRegistry(); + jndi.bind("statefulInstanceInRegistry", statefulInstanceInRegistry); + jndi.bind("statefulInstanceInRegistryNoCache", statefulInstanceInRegistryNoCache); + return jndi; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:foo").routeId("foo") + .bean(statefulInstance, "doSomething") + .bean(MyStatefulBean.class, "doSomething") + .bean(MyStatefulBean.class.getName(), "doSomething", true) + .bean(MyStatelessBean.class.getName(), "doSomething", false) + .to("bean:statefulInstanceInRegistry?method=doSomething&cache=true") + .to("bean:statefulInstanceInRegistryNoCache?method=doSomething&cache=false") + .to("mock:result"); + } + }; + } + + public static class MyBean implements Service { + private String status; + + public String getStatus() { + return status; + } + + public void doSomething(Exchange exchange) { + // noop + } + + @Override + public void start() throws Exception { + status = "started"; + } + + @Override + public void stop() throws Exception { + status = "stopped"; + } + } + + public static class MyStatelessBean implements Service { + + public void doSomething(Exchange exchange) { + // noop + } + + @Override + public void start() throws Exception { + fail("Should not be invoked"); + } + + @Override + public void stop() throws Exception { + fail("Should not be invoked"); + } + } + + public static class MyStatefulBean implements Service { + private static final AtomicInteger INSTANCES = new AtomicInteger(0); + + public MyStatefulBean() { + INSTANCES.incrementAndGet(); + } + + public void doSomething(Exchange exchange) { + // noop + } + + @Override + public void start() throws Exception { + } + + @Override + public void stop() throws Exception { + } + } +}