Repository: camel Updated Branches: refs/heads/master 8a48ccf13 -> 11b328236
CAMEL-7723: Support async start and stop for consumers and producers Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/11b32823 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/11b32823 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/11b32823 Branch: refs/heads/master Commit: 11b328236a1e0410c694bacc96830314f46c5869 Parents: 8a48ccf Author: Cristiano Nicolai <cristiano.nico...@gmail.com> Authored: Tue Aug 19 23:21:00 2014 +1000 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Wed Aug 20 11:08:02 2014 +0800 ---------------------------------------------------------------------- .../camel/component/sjms/SjmsComponent.java | 21 ++++++ .../camel/component/sjms/SjmsConsumer.java | 43 ++++++++++- .../camel/component/sjms/SjmsEndpoint.java | 21 ++++++ .../camel/component/sjms/SjmsProducer.java | 48 +++++++++++- .../sjms/AsyncStartStopListenerTest.java | 79 ++++++++++++++++++++ 5 files changed, 206 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java index 8489d77..03d4506 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsComponent.java @@ -17,6 +17,8 @@ package org.apache.camel.component.sjms; import java.util.Map; +import java.util.concurrent.ExecutorService; + import javax.jms.ConnectionFactory; import org.apache.camel.CamelException; @@ -46,6 +48,7 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS private Integer connectionCount = 1; private TransactionCommitStrategy transactionCommitStrategy; private TimedTaskManager timedTaskManager; + private ExecutorService asyncStartStopExecutorService; public SjmsComponent() { super(SjmsEndpoint.class); @@ -152,6 +155,24 @@ public class SjmsComponent extends UriEndpointComponent implements HeaderFilterS super.doStop(); } + @Override + protected void doShutdown() throws Exception { + if (asyncStartStopExecutorService != null) { + getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService); + asyncStartStopExecutorService = null; + } + super.doShutdown(); + } + + protected synchronized ExecutorService getAsyncStartStopExecutorService() { + if (asyncStartStopExecutorService == null) { + // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread + // for each task, and the thread pool will shrink when no more tasks running + asyncStartStopExecutorService = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener"); + } + return asyncStartStopExecutorService; + } + /** * Sets the ConnectionFactory value of connectionFactory for this instance * of SjmsComponent. http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java index 017acbe..3c4b82f 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java @@ -147,15 +147,52 @@ public class SjmsConsumer extends DefaultConsumer { super.doStart(); this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsConsumer"); consumers = new MessageConsumerPool(); - consumers.fillPool(); + if(getEndpoint().isAsyncStartListener()){ + getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() { + @Override + public void run() { + try { + consumers.fillPool(); + } catch (Throwable e) { + log.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e); + } + } + + @Override + public String toString() { + return "AsyncStartListenerTask[" + getDestinationName() + "]"; + } + }); + } else { + consumers.fillPool(); + } } @Override protected void doStop() throws Exception { super.doStop(); if (consumers != null) { - consumers.drainPool(); - consumers = null; + if(getEndpoint().isAsyncStopListener()){ + getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() { + @Override + public void run() { + try { + consumers.drainPool(); + consumers = null; + } catch (Throwable e) { + log.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e); + } + } + + @Override + public String toString() { + return "AsyncStopListenerTask[" + getDestinationName() + "]"; + } + }); + } else { + consumers.drainPool(); + consumers = null; + } } if (this.executor != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor); http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java index 63118ec..8558a22 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java @@ -70,6 +70,10 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu private int transactionBatchCount = -1; @UriParam private long transactionBatchTimeout = 5000; + @UriParam + private boolean asyncStartListener; + @UriParam + private boolean asyncStopListener; private TransactionCommitStrategy transactionCommitStrategy; public SjmsEndpoint() { @@ -448,4 +452,21 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu this.namedReplyTo = namedReplyTo; this.setExchangePattern(ExchangePattern.InOut); } + + public void setAsyncStartListener(boolean asyncStartListener) { + this.asyncStartListener = asyncStartListener; + } + + public void setAsyncStopListener(boolean asyncStopListener) { + this.asyncStopListener = asyncStopListener; + } + + public boolean isAsyncStartListener() { + return asyncStartListener; + } + + public boolean isAsyncStopListener() { + return asyncStopListener; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java index 8e9e878..64a5914 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java @@ -135,7 +135,25 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsProducer"); if (getProducers() == null) { setProducers(new MessageProducerPool()); - getProducers().fillPool(); + if(getEndpoint().isAsyncStartListener()){ + getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() { + @Override + public void run() { + try { + getProducers().fillPool(); + } catch (Throwable e) { + log.warn("Error starting listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e); + } + } + + @Override + public String toString() { + return "AsyncStartListenerTask[" + getDestinationName() + "]"; + } + }); + } else { + getProducers().fillPool(); + } } } @@ -143,14 +161,38 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { protected void doStop() throws Exception { super.doStop(); if (getProducers() != null) { - getProducers().drainPool(); - setProducers(null); + if(getEndpoint().isAsyncStopListener()){ + getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(new Runnable() { + @Override + public void run() { + try { + getProducers().drainPool(); + setProducers(null); + } catch (Throwable e) { + log.warn("Error stopping listener container on destination: " + getDestinationName() + ". This exception will be ignored.", e); + } + } + + @Override + public String toString() { + return "AsyncStopListenerTask[" + getDestinationName() + "]"; + } + }); + } else { + getProducers().drainPool(); + setProducers(null); + } } if (this.executor != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor); } } + @Override + public SjmsEndpoint getEndpoint() { + return (SjmsEndpoint) super.getEndpoint(); + } + public abstract MessageProducerResources doCreateProducerModel() throws Exception; public abstract void sendMessage(Exchange exchange, final AsyncCallback callback) throws Exception; http://git-wip-us.apache.org/repos/asf/camel/blob/11b32823/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/AsyncStartStopListenerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/AsyncStartStopListenerTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/AsyncStartStopListenerTest.java new file mode 100644 index 0000000..b10698f --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/AsyncStartStopListenerTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.sjms.support.JmsTestSupport; +import org.junit.Test; + +/** + * Testing with async start listener + */ +public class AsyncStartStopListenerTest extends JmsTestSupport { + + @Test + public void testAsyncStartConsumer() throws Exception { + sendBodyAndAssert("sjms:queue:foo.start"); + } + + @Test + public void testAsyncStartStopConsumer() throws Exception { + sendBodyAndAssert("sjms:queue:foo.startstop"); + } + + @Test + public void testAsyncStopConsumer() throws Exception { + sendBodyAndAssert("sjms:queue:foo.stop"); + } + + @Test + public void testAsyncStopProducer() throws Exception { + sendBodyAndAssert("sjms:queue:foo?asyncStopListener=true"); + } + + @Test + public void testAsyncStartProducer() throws Exception { + sendBodyAndAssert("sjms:queue:foo?asyncStartListener=true"); + } + + @Test + public void testAsyncStartStopProducer() throws Exception { + sendBodyAndAssert("sjms:queue:foo?asyncStopListener=true&asyncStartListener=true"); + } + + private void sendBodyAndAssert(final String uri) throws InterruptedException { + String body1 = "Hello World"; + String body2 = "G'day World"; + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedBodiesReceived(body1, body2); + template.sendBody(uri, body1); + template.sendBody(uri, body2); + result.assertIsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("sjms:queue:foo.startstop?asyncStartListener=true&asyncStopListener=true").to("mock:result"); + from("sjms:queue:foo.start?asyncStartListener=true").to("mock:result"); + from("sjms:queue:foo.stop?asyncStopListener=true").to("mock:result"); + from("sjms:queue:foo").to("mock:result"); + } + }; + } +} \ No newline at end of file