Author: davsclaus Date: Fri Mar 5 11:27:36 2010 New Revision: 919382 URL: http://svn.apache.org/viewvc?rev=919382&view=rev Log: CAMEL-1588: Prefer to use CachedExecutorService instead of a fixed size pool. The cached can grow/shrink and is recommended as the best general purpose pool.
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=919382&r1=919381&r2=919382&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Fri Mar 5 11:27:36 2010 @@ -46,7 +46,6 @@ * @version $Revision$ */ public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate { - private static final int DEFAULT_THREADPOOL_SIZE = 10; private final CamelContext context; private final ProducerCache producerCache; private Endpoint defaultEndpoint; @@ -55,7 +54,7 @@ public DefaultProducerTemplate(CamelContext context) { this.context = context; this.producerCache = new ProducerCache(context); - this.executor = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "ProducerTemplate", true); + this.executor = ExecutorServiceHelper.newCachedThreadPool("ProducerTemplate", true); } public DefaultProducerTemplate(CamelContext context, ExecutorService executor) { @@ -684,7 +683,7 @@ super.start(); ServiceHelper.startService(producerCache); if (executor == null || executor.isShutdown()) { - executor = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "ProducerTemplate", true); + executor = ExecutorServiceHelper.newCachedThreadPool("ProducerTemplate", true); } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=919382&r1=919381&r2=919382&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java Fri Mar 5 11:27:36 2010 @@ -123,7 +123,7 @@ } if (executorService == null) { // fall back and use default - executorService = ExecutorServiceHelper.newScheduledThreadPool(10, "RecipientList", true); + executorService = ExecutorServiceHelper.newCachedThreadPool("RecipientList", true); } return executorService; } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=919382&r1=919381&r2=919382&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Fri Mar 5 11:27:36 2010 @@ -105,14 +105,18 @@ } private ExecutorService createExecutorService(RouteContext routeContext) { - if (executorServiceRef != null) { + if (executorService == null && executorServiceRef != null) { executorService = routeContext.lookup(executorServiceRef, ExecutorService.class); + if (executorService == null) { + throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry."); + } } if (executorService == null) { - executorService = ExecutorServiceHelper.newScheduledThreadPool(10, "Split", true); + // fall back and use default + executorService = ExecutorServiceHelper.newCachedThreadPool("Split", true); } return executorService; - } + } // Fluent API // ------------------------------------------------------------------------- Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java?rev=919382&r1=919381&r2=919382&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadsDefinition.java Fri Mar 5 11:27:36 2010 @@ -50,11 +50,14 @@ @Override public Processor createProcessor(RouteContext routeContext) throws Exception { - if (executorServiceRef != null) { + if (executorService == null && executorServiceRef != null) { executorService = routeContext.lookup(executorServiceRef, ExecutorService.class); + if (executorService == null) { + throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry."); + } } if (executorService == null && poolSize != null) { - executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "Threads", true); + executorService = ExecutorServiceHelper.newThreadPool("Threads", poolSize, poolSize); } Processor childProcessor = routeContext.createProcessor(this); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java?rev=919382&r1=919381&r2=919382&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ToDefinition.java Fri Mar 5 11:27:36 2010 @@ -97,7 +97,7 @@ } } if (executorService == null && poolSize != null) { - executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "ToAsync[" + getLabel() + "]", true); + executorService = ExecutorServiceHelper.newThreadPool("ToAsync[" + getLabel() + "]", poolSize, poolSize); } // create the child processor which is the async route Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=919382&r1=919381&r2=919382&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Fri Mar 5 11:27:36 2010 @@ -61,7 +61,6 @@ */ public class MulticastProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable { - private static final int DEFAULT_THREADPOOL_SIZE = 10; private static final transient Log LOG = LogFactory.getLog(MulticastProcessor.class); /** @@ -125,11 +124,8 @@ this.streaming = streaming; this.stopOnException = stopOnException; - if (isParallelProcessing()) { - if (this.executorService == null) { - // setup default executor as parallel processing requires an executor - this.executorService = ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "Multicast", true); - } + if (isParallelProcessing() && getExecutorService() == null) { + this.executorService = ExecutorServiceHelper.newCachedThreadPool("Multicast", true); } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=919382&r1=919381&r2=919382&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Fri Mar 5 11:27:36 2010 @@ -36,7 +36,6 @@ */ public class OnCompletionProcessor extends ServiceSupport implements Processor, Traceable { - private static final int DEFAULT_THREADPOOL_SIZE = 10; private static final transient Log LOG = LogFactory.getLog(OnCompletionProcessor.class); private ExecutorService executorService; private Processor processor; @@ -176,8 +175,8 @@ return executorService; } - private ExecutorService createExecutorService() { - return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, this.toString(), true); + protected ExecutorService createExecutorService() { + return ExecutorServiceHelper.newCachedThreadPool(this.toString(), true); } public void setExecutorService(ExecutorService executorService) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=919382&r1=919381&r2=919382&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java Fri Mar 5 11:27:36 2010 @@ -123,7 +123,7 @@ } /** - * The producer is <b>not</b> capable of async processing so lets simulate this by transfering the task + * The producer is <b>not</b> capable of async processing so lets simulate this by transferring the task * to another {...@link ExecutorService} for async processing. * * @param producer the producer @@ -165,7 +165,7 @@ public ExecutorService getExecutorService() { if (executorService == null) { - executorService = createExecutorService("SendAsyncProcessor-Consumer"); + executorService = ExecutorServiceHelper.newThreadPool("SendAsyncProcessor-Consumer", poolSize, poolSize); } return executorService; } @@ -263,10 +263,6 @@ } } - protected ExecutorService createExecutorService(String name) { - return ExecutorServiceHelper.newScheduledThreadPool(poolSize, name, true); - } - protected void doStart() throws Exception { super.doStart(); Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java?rev=919382&r1=919381&r2=919382&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java Fri Mar 5 11:27:36 2010 @@ -39,7 +39,6 @@ */ public class ThreadsProcessor extends DelegateProcessor implements Processor { - protected static final int DEFAULT_THREADPOOL_SIZE = 10; protected ExecutorService executorService; protected WaitForTaskToComplete waitForTaskToComplete; @@ -105,7 +104,7 @@ } protected ExecutorService createExecutorService() { - return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "Threads", true); + return ExecutorServiceHelper.newCachedThreadPool("Threads", true); } protected void doStop() throws Exception { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=919382&r1=919381&r2=919382&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Fri Mar 5 11:27:36 2010 @@ -38,7 +38,6 @@ */ public class WireTapProcessor extends SendProcessor { - private static final int DEFAULT_THREADPOOL_SIZE = 10; private ExecutorService executorService; // expression or processor used for populating a new exchange to send @@ -161,8 +160,8 @@ return executorService; } - private ExecutorService createExecutorService() { - return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, this.toString(), true); + protected ExecutorService createExecutorService() { + return ExecutorServiceHelper.newCachedThreadPool(this.toString(), true); } public void setExecutorService(ExecutorService executorService) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=919382&r1=919381&r2=919382&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Fri Mar 5 11:27:36 2010 @@ -470,17 +470,18 @@ if (executorService == null) { if (isParallelProcessing()) { - // we are running in parallel so create a default thread pool - executorService = ExecutorServiceHelper.newFixedThreadPool(10, "Aggregator", true); + // we are running in parallel so create a cached thread pool which grows/shrinks automatic + executorService = ExecutorServiceHelper.newCachedThreadPool("Aggregator", true); } else { // use a single threaded if we are not running in parallel - executorService = ExecutorServiceHelper.newFixedThreadPool(1, "Aggregator", true); + executorService = ExecutorServiceHelper.newSingleThreadExecutor("Aggregator", true); } } // start timeout service if its in use if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) { ScheduledExecutorService scheduler = ExecutorServiceHelper.newScheduledThreadPool(1, "AggregateTimeoutChecker", true); + // check for timed out aggregated messages once every second timeoutMap = new AggregationTimeoutMap(scheduler, 1000L); ServiceHelper.startService(timeoutMap); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=919382&r1=919381&r2=919382&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Fri Mar 5 11:27:36 2010 @@ -18,8 +18,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -37,6 +40,9 @@ /** * Creates a new thread name with the given prefix + * + * @param name the prefix + * @return the thread name, which is unique */ public static String getThreadName(String name) { return "Camel thread " + nextThreadCounter() + ": " + name; @@ -46,6 +52,14 @@ return threadCounter.getAndIncrement(); } + /** + * Creates a new scheduled thread pool which can schedule threads. + * + * @param poolSize the core pool size + * @param name part of the thread name + * @param daemon whether the threads is daemon or not + * @return the created pool + */ public static ScheduledExecutorService newScheduledThreadPool(final int poolSize, final String name, final boolean daemon) { return Executors.newScheduledThreadPool(poolSize, new ThreadFactory() { public Thread newThread(Runnable r) { @@ -76,6 +90,13 @@ }); } + /** + * Creates a new cached thread pool which should be the most commonly used. + * + * @param name part of the thread name + * @param daemon whether the threads is daemon or not + * @return the created pool + */ public static ExecutorService newCachedThreadPool(final String name, final boolean daemon) { return Executors.newCachedThreadPool(new ThreadFactory() { public Thread newThread(Runnable r) { @@ -86,4 +107,41 @@ }); } + /** + * Creates a new custom thread pool using 60 seconds as keep alive + * + * @param name part of the thread name + * @param corePoolSize the core size + * @param maxPoolSize the maximum pool size + * @return the created pool + */ + public static ExecutorService newThreadPool(final String name, int corePoolSize, int maxPoolSize) { + return ExecutorServiceHelper.newThreadPool(name, corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, true); + } + + /** + * Creates a new custom thread pool + * + * @param name part of the thread name + * @param corePoolSize the core size + * @param maxPoolSize the maximum pool size + * @param keepAliveTime keep alive + * @param timeUnit keep alive time unit + * @param daemon whether the threads is daemon or not + * @return the created pool + */ + public static ExecutorService newThreadPool(final String name, int corePoolSize, int maxPoolSize, + long keepAliveTime, TimeUnit timeUnit, final boolean daemon) { + ThreadPoolExecutor answer = new ThreadPoolExecutor(corePoolSize, maxPoolSize, + keepAliveTime, timeUnit, new LinkedBlockingQueue<Runnable>()); + answer.setThreadFactory(new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread answer = new Thread(r, getThreadName(name)); + answer.setDaemon(daemon); + return answer; + } + }); + return answer; + } + } Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java?rev=919382&r1=919381&r2=919382&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java Fri Mar 5 11:27:36 2010 @@ -104,7 +104,7 @@ } public void testExecutor() throws Exception { - ScheduledExecutorService e = ExecutorServiceHelper.newScheduledThreadPool(1, "foo", true); + ScheduledExecutorService e = ExecutorServiceHelper.newScheduledThreadPool(2, "foo", true); DefaultTimeoutMap map = new DefaultTimeoutMap(e, 500); assertEquals(500, map.getPurgePollTime());