Author: davsclaus
Date: Fri Mar  9 13:17:52 2012
New Revision: 1298821

URL: http://svn.apache.org/viewvc?rev=1298821&view=rev
Log:
CAMEL-5072: Shutdown thread pool more eagerly in scheduled poll consumers. Also 
shutdown services when stopping producer/consumer templates as the intent is 
not to use them anymore.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/ConsumerTemplateFileShutdownTest.java
    
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/ConsumerTemplateFtpShutdownTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java?rev=1298821&r1=1298820&r2=1298821&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ConsumerTemplate.java 
Fri Mar  9 13:17:52 2012
@@ -38,9 +38,13 @@ package org.apache.camel;
  * <li>The <tt>in.body</tt></li>
  * </ul>
  * <p/>
- * <b>Important note on usage:</b> See this
+ * <br/>
+ * Before using the template it must be started.
+ * And when you are done using the template, make sure to {@link #stop()} the 
template.
+ * <br/>
+ * <p/><b>Important note on usage:</b> See this
  * <a 
href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html";>FAQ
 entry</a>
- * before using, it applies to ConsumerTemplate as well.
+ * before using, it applies to this {@link ConsumerTemplate} as well.
  *
  * @version 
  */

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=1298821&r1=1298820&r2=1298821&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java 
Fri Mar  9 13:17:52 2012
@@ -47,7 +47,11 @@ import org.apache.camel.spi.Synchronizat
  *   <li>Either <tt>IN</tt> or <tt>OUT</tt> body according to the message 
exchange pattern. If the pattern is
  *   Out capable then the <tt>OUT</tt> body is returned, otherwise <tt>IN</tt>.
  * </ul>
- * <br/><br/>
+ * <p/>
+ * <br/>
+ * Before using the template it must be started.
+ * And when you are done using the template, make sure to {@link #stop()} the 
template.
+ * <br/>
  * <p/><b>Important note on usage:</b> See this
  * <a 
href="http://camel.apache.org/why-does-camel-use-too-many-threads-with-producertemplate.html";>FAQ
 entry</a>
  * before using.

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java?rev=1298821&r1=1298820&r2=1298821&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java 
Fri Mar  9 13:17:52 2012
@@ -216,7 +216,8 @@ public class ConsumerCache extends Servi
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(consumers.values());
+        // when stopping we intend to shutdown
+        ServiceHelper.stopAndShutdownServices(consumers.values());
         consumers.clear();
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java?rev=1298821&r1=1298820&r2=1298821&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
 Fri Mar  9 13:17:52 2012
@@ -264,7 +264,8 @@ public class DefaultConsumerTemplate ext
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopService(consumerCache);
+        // we should shutdown the services as this is our intention, to not 
re-use the services anymore
+        ServiceHelper.stopAndShutdownService(consumerCache);
         consumerCache = null;
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java?rev=1298821&r1=1298820&r2=1298821&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/EventDrivenPollingConsumer.java
 Fri Mar  9 13:17:52 2012
@@ -152,4 +152,8 @@ public class EventDrivenPollingConsumer 
     protected void doStop() throws Exception {
         ServiceHelper.stopService(consumer);
     }
+
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownService(consumer);
+    }
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=1298821&r1=1298820&r2=1298821&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
Fri Mar  9 13:17:52 2012
@@ -394,17 +394,18 @@ public class ProducerCache extends Servi
         return answer;
     }
 
-    protected void doStop() throws Exception {
-        ServiceHelper.stopServices(pool);
-        ServiceHelper.stopServices(producers.values());
-        producers.clear();
-    }
-
     protected void doStart() throws Exception {
         ServiceHelper.startServices(producers.values());
         ServiceHelper.startServices(pool);
     }
 
+    protected void doStop() throws Exception {
+        // when stopping we intend to shutdown
+        ServiceHelper.stopAndShutdownService(pool);
+        ServiceHelper.stopAndShutdownServices(producers.values());
+        producers.clear();
+    }
+
     /**
      * Returns the current size of the cache
      *

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1298821&r1=1298820&r2=1298821&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
 Fri Mar  9 13:17:52 2012
@@ -40,7 +40,8 @@ import org.slf4j.LoggerFactory;
 public abstract class ScheduledPollConsumer extends DefaultConsumer implements 
Runnable, SuspendableService, PollingConsumerPollingStrategy {
     private static final transient Logger LOG = 
LoggerFactory.getLogger(ScheduledPollConsumer.class);
 
-    private final ScheduledExecutorService executor;
+    private ScheduledExecutorService executor;
+    private boolean shutdownExecutor;
     private ScheduledFuture<?> future;
 
     // if adding more options then align with 
ScheduledPollEndpoint#configureScheduledPollConsumerProperties
@@ -56,15 +57,12 @@ public abstract class ScheduledPollConsu
 
     public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
-
-        // we only need one thread in the pool to schedule this task
-        this.executor = endpoint.getCamelContext().getExecutorServiceManager()
-                            .newScheduledThreadPool(this, 
endpoint.getEndpointUri(), 1);
-        ObjectHelper.notNull(executor, "executor");
     }
 
     public ScheduledPollConsumer(Endpoint endpoint, Processor processor, 
ScheduledExecutorService executor) {
         super(endpoint, processor);
+        // we have been given an existing thread pool, so we should not manage 
its lifecycle
+        // so we should keep shutdownExecutor as false
         this.executor = executor;
         ObjectHelper.notNull(executor, "executor");
     }
@@ -298,6 +296,16 @@ public abstract class ScheduledPollConsu
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+
+        // if no existing executor provided, then create a new thread pool 
ourselves
+        if (executor == null) {
+            // we only need one thread in the pool to schedule this task
+            this.executor = 
getEndpoint().getCamelContext().getExecutorServiceManager()
+                    .newScheduledThreadPool(this, 
getEndpoint().getEndpointUri(), 1);
+            // and we should shutdown the thread pool when no longer needed
+            this.shutdownExecutor = true;
+        }
+
         ObjectHelper.notNull(executor, "executor", this);
         ObjectHelper.notNull(pollStrategy, "pollStrategy", this);
 
@@ -332,6 +340,16 @@ public abstract class ScheduledPollConsu
     }
 
     @Override
+    protected void doShutdown() throws Exception {
+        if (shutdownExecutor && executor != null) {
+            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            executor = null;
+            future = null;
+        }
+        super.doShutdown();
+    }
+
+    @Override
     protected void doSuspend() throws Exception {
         // dont stop/cancel the future task since we just check in the run 
method
     }

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/ConsumerTemplateFileShutdownTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/ConsumerTemplateFileShutdownTest.java?rev=1298821&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/ConsumerTemplateFileShutdownTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/ConsumerTemplateFileShutdownTest.java
 Fri Mar  9 13:17:52 2012
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+
+/**
+ *
+ */
+public class ConsumerTemplateFileShutdownTest extends ContextTestSupport {
+    
+    public void testConsumerTemplateFile() throws Exception {
+        deleteDirectory("target/consumertemplate");
+        
+        template.sendBodyAndHeader("file:target/consumertemplate", "Hello 
World", Exchange.FILE_NAME, "hello.txt");
+        
+        Exchange exchange = 
consumer.receive("file:target/consumertemplate?fileName=hello.txt", 5000);
+        assertNotNull(exchange);
+        
+        assertEquals("Hello World", exchange.getIn().getBody(String.class));
+
+        consumer.doneUoW(exchange);
+        consumer.stop();
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+}

Added: 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/ConsumerTemplateFtpShutdownTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/ConsumerTemplateFtpShutdownTest.java?rev=1298821&view=auto
==============================================================================
--- 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/ConsumerTemplateFtpShutdownTest.java
 (added)
+++ 
camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/ConsumerTemplateFtpShutdownTest.java
 Fri Mar  9 13:17:52 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import org.apache.camel.Exchange;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ConsumerTemplateFtpShutdownTest extends FtpServerTestSupport {
+
+    protected String getFtpUrl() {
+        return "ftp://admin@localhost:"; + getPort() + 
"/template?password=admin";
+    }
+
+    @Test
+    public void testConsumerTemplateFtp() throws Exception {
+        template.sendBodyAndHeader(getFtpUrl(), "Hello World", 
Exchange.FILE_NAME, "hello.txt");
+
+        Exchange exchange = consumer.receive(getFtpUrl() + 
"&fileName=hello.txt", 5000);
+        assertNotNull(exchange);
+        
+        consumer.doneUoW(exchange);
+        consumer.stop();
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+}


Reply via email to