Author: davsclaus
Date: Fri Mar  5 11:27:36 2010
New Revision: 919382

URL: http://svn.apache.org/viewvc?rev=919382&view=rev
Log:
CAMEL-1588: Prefer to use CachedExecutorService instead of a fixed size pool. 
The cached can grow/shrink and is recommended as the best general purpose pool.

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
 Fri Mar  5 11:27:36 2010
@@ -46,7 +46,6 @@
  * @version $Revision$
  */
 public class DefaultProducerTemplate extends ServiceSupport implements 
ProducerTemplate {
-    private static final int DEFAULT_THREADPOOL_SIZE = 10;
     private final CamelContext context;
     private final ProducerCache producerCache;
     private Endpoint defaultEndpoint;
@@ -55,7 +54,7 @@
     public DefaultProducerTemplate(CamelContext context) {
         this.context = context;
         this.producerCache = new ProducerCache(context);
-        this.executor = 
ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, 
"ProducerTemplate", true);
+        this.executor = 
ExecutorServiceHelper.newCachedThreadPool("ProducerTemplate", true);
     }
 
     public DefaultProducerTemplate(CamelContext context, ExecutorService 
executor) {
@@ -684,7 +683,7 @@
         super.start();
         ServiceHelper.startService(producerCache);
         if (executor == null || executor.isShutdown()) {
-            executor = 
ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, 
"ProducerTemplate", true);
+            executor = 
ExecutorServiceHelper.newCachedThreadPool("ProducerTemplate", true);
         }
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
 Fri Mar  5 11:27:36 2010
@@ -123,7 +123,7 @@
         }
         if (executorService == null) {
             // fall back and use default
-            executorService = ExecutorServiceHelper.newScheduledThreadPool(10, 
"RecipientList", true);
+            executorService = 
ExecutorServiceHelper.newCachedThreadPool("RecipientList", true);
         }
         return executorService;
     }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
 Fri Mar  5 11:27:36 2010
@@ -105,14 +105,18 @@
     }        
     
     private ExecutorService createExecutorService(RouteContext routeContext) {
-        if (executorServiceRef != null) {
+        if (executorService == null && executorServiceRef != null) {
             executorService = routeContext.lookup(executorServiceRef, 
ExecutorService.class);
+            if (executorService == null) {
+                throw new IllegalArgumentException("ExecutorServiceRef " + 
executorServiceRef + " not found in registry.");
+            }
         }
         if (executorService == null) {
-            executorService = ExecutorServiceHelper.newScheduledThreadPool(10, 
"Split", true);
+            // fall back and use default
+            executorService = 
ExecutorServiceHelper.newCachedThreadPool("Split", true);
         }
         return executorService;
-    }         
+    }
     
     // Fluent API
     // 
-------------------------------------------------------------------------

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java
 Fri Mar  5 11:27:36 2010
@@ -50,11 +50,14 @@
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws 
Exception {
-        if (executorServiceRef != null) {
+        if (executorService == null && executorServiceRef != null) {
             executorService = routeContext.lookup(executorServiceRef, 
ExecutorService.class);
+            if (executorService == null) {
+                throw new IllegalArgumentException("ExecutorServiceRef " + 
executorServiceRef + " not found in registry.");
+            }
         }
         if (executorService == null && poolSize != null) {
-            executorService = 
ExecutorServiceHelper.newScheduledThreadPool(poolSize, "Threads", true);
+            executorService = ExecutorServiceHelper.newThreadPool("Threads", 
poolSize, poolSize);
         }
         Processor childProcessor = routeContext.createProcessor(this);
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java 
Fri Mar  5 11:27:36 2010
@@ -97,7 +97,7 @@
             }
         }
         if (executorService == null && poolSize != null) {
-            executorService = 
ExecutorServiceHelper.newScheduledThreadPool(poolSize, "ToAsync[" + getLabel() 
+ "]", true);
+            executorService = ExecutorServiceHelper.newThreadPool("ToAsync[" + 
getLabel() + "]", poolSize, poolSize);
         }
 
         // create the child processor which is the async route

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 Fri Mar  5 11:27:36 2010
@@ -61,7 +61,6 @@
  */
 public class MulticastProcessor extends ServiceSupport implements Processor, 
Navigate<Processor>, Traceable {
 
-    private static final int DEFAULT_THREADPOOL_SIZE = 10;
     private static final transient Log LOG = 
LogFactory.getLog(MulticastProcessor.class);
 
     /**
@@ -125,11 +124,8 @@
         this.streaming = streaming;
         this.stopOnException = stopOnException;
 
-        if (isParallelProcessing()) {
-            if (this.executorService == null) {
-                // setup default executor as parallel processing requires an 
executor
-                this.executorService = 
ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, 
"Multicast", true);
-            }
+        if (isParallelProcessing() && getExecutorService() == null) {
+            this.executorService = 
ExecutorServiceHelper.newCachedThreadPool("Multicast", true);
         }
     }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
 Fri Mar  5 11:27:36 2010
@@ -36,7 +36,6 @@
  */
 public class OnCompletionProcessor extends ServiceSupport implements 
Processor, Traceable {
 
-    private static final int DEFAULT_THREADPOOL_SIZE = 10;
     private static final transient Log LOG = 
LogFactory.getLog(OnCompletionProcessor.class);
     private ExecutorService executorService;
     private Processor processor;
@@ -176,8 +175,8 @@
         return executorService;
     }
 
-    private ExecutorService createExecutorService() {
-        return 
ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, 
this.toString(), true);
+    protected ExecutorService createExecutorService() {
+        return ExecutorServiceHelper.newCachedThreadPool(this.toString(), 
true);
     }
 
     public void setExecutorService(ExecutorService executorService) {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
 Fri Mar  5 11:27:36 2010
@@ -123,7 +123,7 @@
     }
 
     /**
-     * The producer is <b>not</b> capable of async processing so lets simulate 
this by transfering the task
+     * The producer is <b>not</b> capable of async processing so lets simulate 
this by transferring the task
      * to another {...@link ExecutorService} for async processing.
      *
      * @param producer the producer
@@ -165,7 +165,7 @@
 
     public ExecutorService getExecutorService() {
         if (executorService == null) {
-            executorService = 
createExecutorService("SendAsyncProcessor-Consumer");
+            executorService = 
ExecutorServiceHelper.newThreadPool("SendAsyncProcessor-Consumer", poolSize, 
poolSize);
         }
         return executorService;
     }
@@ -263,10 +263,6 @@
         }
     }
 
-    protected ExecutorService createExecutorService(String name) {
-        return ExecutorServiceHelper.newScheduledThreadPool(poolSize, name, 
true);
-    }
-
     protected void doStart() throws Exception {
         super.doStart();
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
 Fri Mar  5 11:27:36 2010
@@ -39,7 +39,6 @@
  */
 public class ThreadsProcessor extends DelegateProcessor implements Processor {
 
-    protected static final int DEFAULT_THREADPOOL_SIZE = 10;
     protected ExecutorService executorService;
     protected WaitForTaskToComplete waitForTaskToComplete;
 
@@ -105,7 +104,7 @@
     }
 
     protected ExecutorService createExecutorService() {
-        return 
ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, 
"Threads", true);
+        return ExecutorServiceHelper.newCachedThreadPool("Threads", true);
     }
 
     protected void doStop() throws Exception {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
 Fri Mar  5 11:27:36 2010
@@ -38,7 +38,6 @@
  */
 public class WireTapProcessor extends SendProcessor {
 
-    private static final int DEFAULT_THREADPOOL_SIZE = 10;
     private ExecutorService executorService;
 
     // expression or processor used for populating a new exchange to send
@@ -161,8 +160,8 @@
         return executorService;
     }
 
-    private ExecutorService createExecutorService() {
-        return 
ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, 
this.toString(), true);
+    protected ExecutorService createExecutorService() {
+        return ExecutorServiceHelper.newCachedThreadPool(this.toString(), 
true);
     }
 
     public void setExecutorService(ExecutorService executorService) {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 Fri Mar  5 11:27:36 2010
@@ -470,17 +470,18 @@
 
         if (executorService == null) {
             if (isParallelProcessing()) {
-                // we are running in parallel so create a default thread pool
-                executorService = ExecutorServiceHelper.newFixedThreadPool(10, 
"Aggregator", true);
+                // we are running in parallel so create a cached thread pool 
which grows/shrinks automatic
+                executorService = 
ExecutorServiceHelper.newCachedThreadPool("Aggregator", true);
             } else {
                 // use a single threaded if we are not running in parallel
-                executorService = ExecutorServiceHelper.newFixedThreadPool(1, 
"Aggregator", true);
+                executorService = 
ExecutorServiceHelper.newSingleThreadExecutor("Aggregator", true);
             }
         }
 
         // start timeout service if its in use
         if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != 
null) {
             ScheduledExecutorService scheduler = 
ExecutorServiceHelper.newScheduledThreadPool(1, "AggregateTimeoutChecker", 
true);
+            // check for timed out aggregated messages once every second
             timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);
             ServiceHelper.startService(timeoutMap);
         }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
 Fri Mar  5 11:27:36 2010
@@ -18,8 +18,11 @@
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -37,6 +40,9 @@
 
     /**
      * Creates a new thread name with the given prefix
+     *
+     * @param name the prefix
+     * @return the thread name, which is unique
      */
     public static String getThreadName(String name) {
         return "Camel thread " + nextThreadCounter() + ": " + name;
@@ -46,6 +52,14 @@
         return threadCounter.getAndIncrement();
     }
 
+    /**
+     * Creates a new scheduled thread pool which can schedule threads.
+     *
+     * @param poolSize the core pool size
+     * @param name     part of the thread name
+     * @param daemon   whether the threads is daemon or not
+     * @return the created pool
+     */
     public static ScheduledExecutorService newScheduledThreadPool(final int 
poolSize, final String name, final boolean daemon) {
         return Executors.newScheduledThreadPool(poolSize, new ThreadFactory() {
             public Thread newThread(Runnable r) {
@@ -76,6 +90,13 @@
         });
     }
 
+    /**
+     * Creates a new cached thread pool which should be the most commonly used.
+     *
+     * @param name    part of the thread name
+     * @param daemon  whether the threads is daemon or not
+     * @return the created pool
+     */
     public static ExecutorService newCachedThreadPool(final String name, final 
boolean daemon) {
         return Executors.newCachedThreadPool(new ThreadFactory() {
             public Thread newThread(Runnable r) {
@@ -86,4 +107,41 @@
         });
     }
 
+    /**
+     * Creates a new custom thread pool using 60 seconds as keep alive
+     *
+     * @param name          part of the thread name
+     * @param corePoolSize  the core size
+     * @param maxPoolSize   the maximum pool size
+     * @return the created pool
+     */
+    public static ExecutorService newThreadPool(final String name, int 
corePoolSize, int maxPoolSize) {
+        return ExecutorServiceHelper.newThreadPool(name, corePoolSize, 
maxPoolSize, 60, TimeUnit.SECONDS, true);
+    }
+
+    /**
+     * Creates a new custom thread pool
+     *
+     * @param name          part of the thread name
+     * @param corePoolSize  the core size
+     * @param maxPoolSize   the maximum pool size
+     * @param keepAliveTime keep alive
+     * @param timeUnit      keep alive time unit
+     * @param daemon        whether the threads is daemon or not
+     * @return the created pool
+     */
+    public static ExecutorService newThreadPool(final String name, int 
corePoolSize, int maxPoolSize,
+                                                long keepAliveTime, TimeUnit 
timeUnit, final boolean daemon) {
+        ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, 
maxPoolSize,
+                                                           keepAliveTime, 
timeUnit, new LinkedBlockingQueue<Runnable>());
+        answer.setThreadFactory(new ThreadFactory() {
+            public Thread newThread(Runnable r) {
+                Thread answer = new Thread(r, getThreadName(name));
+                answer.setDaemon(daemon);
+                return answer;
+            }
+        });
+        return answer;
+    }
+
 }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java?rev=919382&r1=919381&r2=919382&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
 Fri Mar  5 11:27:36 2010
@@ -104,7 +104,7 @@
     }
 
     public void testExecutor() throws Exception {
-        ScheduledExecutorService e = 
ExecutorServiceHelper.newScheduledThreadPool(1, "foo", true);
+        ScheduledExecutorService e = 
ExecutorServiceHelper.newScheduledThreadPool(2, "foo", true);
 
         DefaultTimeoutMap map = new DefaultTimeoutMap(e, 500);
         assertEquals(500, map.getPurgePollTime());


Reply via email to