Author: davsclaus Date: Sun Nov 14 09:20:10 2010 New Revision: 1034962 URL: http://svn.apache.org/viewvc?rev=1034962&view=rev Log: CAMEL-3337: Fixed creating synchronous thread pool.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=1034962&r1=1034961&r2=1034962&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java Sun Nov 14 09:20:10 2010 @@ -254,6 +254,16 @@ public class DefaultExecutorServiceStrat return answer; } + public ExecutorService newSynchronousThreadPool(Object source, String name) { + ExecutorService answer = ExecutorServiceHelper.newSynchronousThreadPool(threadNamePattern, name); + onThreadPoolCreated(answer); + + if (LOG.isDebugEnabled()) { + LOG.debug("Created new synchronous thread pool for source: " + source + " with name: " + name + ". -> " + answer); + } + return answer; + } + public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize) { ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize); onThreadPoolCreated(answer); @@ -265,6 +275,17 @@ public class DefaultExecutorServiceStrat return answer; } + public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, int maxQueueSize) { + ExecutorService answer = ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, maxPoolSize, maxQueueSize); + onThreadPoolCreated(answer); + + if (LOG.isDebugEnabled()) { + LOG.debug("Created new thread pool for source: " + source + " with name: " + name + ". [poolSize=" + corePoolSize + + ", maxPoolSize=" + maxPoolSize + ", maxQueueSize=" + maxQueueSize + "] -> " + answer); + } + return answer; + } + public ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, RejectedExecutionHandler rejectedExecutionHandler, boolean daemon) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=1034962&r1=1034961&r2=1034962&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Sun Nov 14 09:20:10 2010 @@ -194,6 +194,15 @@ public interface ExecutorServiceStrategy ExecutorService newSingleThreadExecutor(Object source, String name); /** + * Creates a new synchronous thread pool, which executes the task in the caller thread (no task queue). + * + * @param source the source object, usually it should be <tt>this</tt> passed in as parameter + * @param name name which is appended to the thread name + * @return the created thread pool + */ + ExecutorService newSynchronousThreadPool(Object source, String name); + + /** * Creates a new custom thread pool. * <p/> * Will by default use 60 seconds for keep alive time for idle threads. @@ -209,6 +218,21 @@ public interface ExecutorServiceStrategy /** * Creates a new custom thread pool. + * <p/> + * Will by default use 60 seconds for keep alive time for idle threads. + * And use {...@link java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy} as rejection handler + * + * @param source the source object, usually it should be <tt>this</tt> passed in as parameter + * @param name name which is appended to the thread name + * @param corePoolSize the core pool size + * @param maxPoolSize the maximum pool size + * @param maxQueueSize the maximum number of tasks in the queue, use <tt>Integer.MAX_INT</tt> or <tt>-1</tt> to indicate unbounded + * @return the created thread pool + */ + ExecutorService newThreadPool(Object source, String name, int corePoolSize, int maxPoolSize, int maxQueueSize); + + /** + * Creates a new custom thread pool. * * @param source the source object, usually it should be <tt>this</tt> passed in as parameter * @param name name which is appended to the thread name Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1034962&r1=1034961&r2=1034962&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Sun Nov 14 09:20:10 2010 @@ -112,7 +112,9 @@ public final class ExecutorServiceHelper } /** - * Creates a new fixed thread pool + * Creates a new fixed thread pool. + * <p/> + * Beware that the task queue is unbounded * * @param poolSize the fixed pool size * @param pattern pattern of the thread name @@ -169,6 +171,20 @@ public final class ExecutorServiceHelper } /** + * Creates a new synchronous thread pool which executes the task in the caller thread (no task queue) + * <p/> + * Uses a {...@link java.util.concurrent.SynchronousQueue} queue as task queue. + * + * @param pattern pattern of the thread name + * @param name ${name} in the pattern name + * @return the created pool + */ + public static ExecutorService newSynchronousThreadPool(final String pattern, final String name) { + return ExecutorServiceHelper.newThreadPool(pattern, name, 0, 0, 60, + TimeUnit.SECONDS, 0, new ThreadPoolExecutor.CallerRunsPolicy(), true); + } + + /** * Creates a new custom thread pool using 60 seconds as keep alive and with an unbounded queue. * * @param pattern pattern of the thread name @@ -183,6 +199,21 @@ public final class ExecutorServiceHelper } /** + * Creates a new custom thread pool using 60 seconds as keep alive and with bounded queue. + * + * @param pattern pattern of the thread name + * @param name ${name} in the pattern name + * @param corePoolSize the core size + * @param maxPoolSize the maximum pool size + * @param maxQueueSize the maximum number of tasks in the queue, use <tt>Integer.MAX_VALUE</tt> or <tt>-1</tt> to indicate unbounded + * @return the created pool + */ + public static ExecutorService newThreadPool(final String pattern, final String name, int corePoolSize, int maxPoolSize, int maxQueueSize) { + return ExecutorServiceHelper.newThreadPool(pattern, name, corePoolSize, maxPoolSize, 60, + TimeUnit.SECONDS, maxQueueSize, new ThreadPoolExecutor.CallerRunsPolicy(), true); + } + + /** * Creates a new custom thread pool * * @param pattern pattern of the thread name @@ -209,8 +240,11 @@ public final class ExecutorServiceHelper BlockingQueue<Runnable> queue; if (corePoolSize == 0 && maxQueueSize <= 0) { - // use a synchronous so we can act like the cached thread pool + // use a synchronous queue queue = new SynchronousQueue<Runnable>(); + // and force 1 as pool size to be able to create the thread pool by the JDK + corePoolSize = 1; + maxPoolSize = 1; } else if (maxQueueSize <= 0) { // unbounded task queue queue = new LinkedBlockingQueue<Runnable>(); Added: camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java?rev=1034962&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java Sun Nov 14 09:20:10 2010 @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.concurrent; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + +/** + * @version $Revision$ + */ +public class ExecutorServiceHelperTest extends TestCase { + + public void testGetThreadName() { + String name = ExecutorServiceHelper.getThreadName("Camel Thread ${counter} - ${name}", "foo"); + + assertTrue(name.startsWith("Camel Thread")); + assertTrue(name.endsWith("foo")); + } + + public void testNewScheduledThreadPool() { + ScheduledExecutorService pool = ExecutorServiceHelper.newScheduledThreadPool(1, "MyPool ${name}", "foo", true); + assertNotNull(pool); + } + + public void testNewThreadPool() { + ExecutorService pool = ExecutorServiceHelper.newThreadPool("MyPool ${name}", "foo", 1, 1); + assertNotNull(pool); + } + + public void testNewThreadPool2() { + ExecutorService pool = ExecutorServiceHelper.newThreadPool("MyPool ${name}", "foo", 1, 1, 20); + assertNotNull(pool); + } + + public void testNewThreadPool3() { + ExecutorService pool = ExecutorServiceHelper.newThreadPool("MyPool ${name}", "foo", 1, 1, + 30, TimeUnit.SECONDS, 20, null, true); + assertNotNull(pool); + } + + public void testNewCachedThreadPool() { + ExecutorService pool = ExecutorServiceHelper.newCachedThreadPool("MyPool ${name}", "foo", true); + assertNotNull(pool); + } + + public void testNewFixedThreadPool() { + ExecutorService pool = ExecutorServiceHelper.newFixedThreadPool(1, "MyPool ${name}", "foo", true); + assertNotNull(pool); + } + + public void testNewSynchronousThreadPool() { + ExecutorService pool = ExecutorServiceHelper.newSynchronousThreadPool("MyPool ${name}", "foo"); + assertNotNull(pool); + } + + public void testNewSingleThreadPool() { + ExecutorService pool = ExecutorServiceHelper.newSingleThreadExecutor("MyPool ${name}", "foo", true); + assertNotNull(pool); + } + +}