Author: davsclaus Date: Fri Mar 9 13:17:52 2012 New Revision: 1298821 URL: http://svn.apache.org/viewvc?rev=1298821&view=rev Log: CAMEL-5072: Shutdown thread pool more eagerly in scheduled poll consumers. Also shutdown services when stopping producer/consumer templates as the intent is not to use them anymore.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/ConsumerTemplateFileShutdownTest.java camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/ConsumerTemplateFtpShutdownTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java?rev=1298821&r1=1298820&r2=1298821&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java Fri Mar 9 13:17:52 2012 @@ -38,9 +38,13 @@ package org.apache.camel; * <li>The <tt>in.body</tt></li> * </ul> * <p/> - * <b>Important note on usage:</b> See this + * <br/> + * Before using the template it must be started. + * And when you are done using the template, make sure to {@link #stop()} the template. + * <br/> + * <p/><b>Important note on usage:</b> See this * <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">FAQ entry</a> - * before using, it applies to ConsumerTemplate as well. + * before using, it applies to this {@link ConsumerTemplate} as well. * * @version */ Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=1298821&r1=1298820&r2=1298821&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Fri Mar 9 13:17:52 2012 @@ -47,7 +47,11 @@ import org.apache.camel.spi.Synchronizat * <li>Either <tt>IN</tt> or <tt>OUT</tt> body according to the message exchange pattern. If the pattern is * Out capable then the <tt>OUT</tt> body is returned, otherwise <tt>IN</tt>. * </ul> - * <br/><br/> + * <p/> + * <br/> + * Before using the template it must be started. + * And when you are done using the template, make sure to {@link #stop()} the template. + * <br/> * <p/><b>Important note on usage:</b> See this * <a href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html">FAQ entry</a> * before using. Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java?rev=1298821&r1=1298820&r2=1298821&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java Fri Mar 9 13:17:52 2012 @@ -216,7 +216,8 @@ public class ConsumerCache extends Servi } protected void doStop() throws Exception { - ServiceHelper.stopServices(consumers.values()); + // when stopping we intend to shutdown + ServiceHelper.stopAndShutdownServices(consumers.values()); consumers.clear(); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java?rev=1298821&r1=1298820&r2=1298821&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java Fri Mar 9 13:17:52 2012 @@ -264,7 +264,8 @@ public class DefaultConsumerTemplate ext } protected void doStop() throws Exception { - ServiceHelper.stopService(consumerCache); + // we should shutdown the services as this is our intention, to not re-use the services anymore + ServiceHelper.stopAndShutdownService(consumerCache); consumerCache = null; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java?rev=1298821&r1=1298820&r2=1298821&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java Fri Mar 9 13:17:52 2012 @@ -152,4 +152,8 @@ public class EventDrivenPollingConsumer protected void doStop() throws Exception { ServiceHelper.stopService(consumer); } + + protected void doShutdown() throws Exception { + ServiceHelper.stopAndShutdownService(consumer); + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=1298821&r1=1298820&r2=1298821&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Fri Mar 9 13:17:52 2012 @@ -394,17 +394,18 @@ public class ProducerCache extends Servi return answer; } - protected void doStop() throws Exception { - ServiceHelper.stopServices(pool); - ServiceHelper.stopServices(producers.values()); - producers.clear(); - } - protected void doStart() throws Exception { ServiceHelper.startServices(producers.values()); ServiceHelper.startServices(pool); } + protected void doStop() throws Exception { + // when stopping we intend to shutdown + ServiceHelper.stopAndShutdownService(pool); + ServiceHelper.stopAndShutdownServices(producers.values()); + producers.clear(); + } + /** * Returns the current size of the cache * Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1298821&r1=1298820&r2=1298821&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Fri Mar 9 13:17:52 2012 @@ -40,7 +40,8 @@ import org.slf4j.LoggerFactory; public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService, PollingConsumerPollingStrategy { private static final transient Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class); - private final ScheduledExecutorService executor; + private ScheduledExecutorService executor; + private boolean shutdownExecutor; private ScheduledFuture<?> future; // if adding more options then align with ScheduledPollEndpoint#configureScheduledPollConsumerProperties @@ -56,15 +57,12 @@ public abstract class ScheduledPollConsu public ScheduledPollConsumer(Endpoint endpoint, Processor processor) { super(endpoint, processor); - - // we only need one thread in the pool to schedule this task - this.executor = endpoint.getCamelContext().getExecutorServiceManager() - .newScheduledThreadPool(this, endpoint.getEndpointUri(), 1); - ObjectHelper.notNull(executor, "executor"); } public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService executor) { super(endpoint, processor); + // we have been given an existing thread pool, so we should not manage its lifecycle + // so we should keep shutdownExecutor as false this.executor = executor; ObjectHelper.notNull(executor, "executor"); } @@ -298,6 +296,16 @@ public abstract class ScheduledPollConsu @Override protected void doStart() throws Exception { super.doStart(); + + // if no existing executor provided, then create a new thread pool ourselves + if (executor == null) { + // we only need one thread in the pool to schedule this task + this.executor = getEndpoint().getCamelContext().getExecutorServiceManager() + .newScheduledThreadPool(this, getEndpoint().getEndpointUri(), 1); + // and we should shutdown the thread pool when no longer needed + this.shutdownExecutor = true; + } + ObjectHelper.notNull(executor, "executor", this); ObjectHelper.notNull(pollStrategy, "pollStrategy", this); @@ -332,6 +340,16 @@ public abstract class ScheduledPollConsu } @Override + protected void doShutdown() throws Exception { + if (shutdownExecutor && executor != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); + executor = null; + future = null; + } + super.doShutdown(); + } + + @Override protected void doSuspend() throws Exception { // dont stop/cancel the future task since we just check in the run method } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/ConsumerTemplateFileShutdownTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/ConsumerTemplateFileShutdownTest.java?rev=1298821&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/ConsumerTemplateFileShutdownTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/ConsumerTemplateFileShutdownTest.java Fri Mar 9 13:17:52 2012 @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; + +/** + * + */ +public class ConsumerTemplateFileShutdownTest extends ContextTestSupport { + + public void testConsumerTemplateFile() throws Exception { + deleteDirectory("target/consumertemplate"); + + template.sendBodyAndHeader("file:target/consumertemplate", "Hello World", Exchange.FILE_NAME, "hello.txt"); + + Exchange exchange = consumer.receive("file:target/consumertemplate?fileName=hello.txt", 5000); + assertNotNull(exchange); + + assertEquals("Hello World", exchange.getIn().getBody(String.class)); + + consumer.doneUoW(exchange); + consumer.stop(); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } +} Added: camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/ConsumerTemplateFtpShutdownTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/ConsumerTemplateFtpShutdownTest.java?rev=1298821&view=auto ============================================================================== --- camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/ConsumerTemplateFtpShutdownTest.java (added) +++ camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/ConsumerTemplateFtpShutdownTest.java Fri Mar 9 13:17:52 2012 @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.file.remote; + +import org.apache.camel.Exchange; +import org.junit.Test; + +/** + * + */ +public class ConsumerTemplateFtpShutdownTest extends FtpServerTestSupport { + + protected String getFtpUrl() { + return "ftp://admin@localhost:" + getPort() + "/template?password=admin"; + } + + @Test + public void testConsumerTemplateFtp() throws Exception { + template.sendBodyAndHeader(getFtpUrl(), "Hello World", Exchange.FILE_NAME, "hello.txt"); + + Exchange exchange = consumer.receive(getFtpUrl() + "&fileName=hello.txt", 5000); + assertNotNull(exchange); + + consumer.doneUoW(exchange); + consumer.stop(); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + +}