Author: davsclaus
Date: Sun Nov 14 09:20:10 2010
New Revision: 1034962

URL: http://svn.apache.org/viewvc?rev=1034962&view=rev
Log:
CAMEL-3337: Fixed creating synchronous thread pool.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java?rev=1034962&r1=1034961&r2=1034962&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExecutorServiceStrategy.java
 Sun Nov 14 09:20:10 2010
@@ -254,6 +254,16 @@ public class DefaultExecutorServiceStrat
         return answer;
     }
 
+    public ExecutorService newSynchronousThreadPool(Object source, String 
name) {
+        ExecutorService answer = 
ExecutorServiceHelper.newSynchronousThreadPool(threadNamePattern, name);
+        onThreadPoolCreated(answer);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Created new synchronous thread pool for source: " + 
source + " with name: " + name + ". -> " + answer);
+        }
+        return answer;
+    }
+
     public ExecutorService newThreadPool(Object source, String name, int 
corePoolSize, int maxPoolSize) {
         ExecutorService answer = 
ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, 
maxPoolSize);
         onThreadPoolCreated(answer);
@@ -265,6 +275,17 @@ public class DefaultExecutorServiceStrat
         return answer;
     }
 
+    public ExecutorService newThreadPool(Object source, String name, int 
corePoolSize, int maxPoolSize, int maxQueueSize) {
+        ExecutorService answer = 
ExecutorServiceHelper.newThreadPool(threadNamePattern, name, corePoolSize, 
maxPoolSize, maxQueueSize);
+        onThreadPoolCreated(answer);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Created new thread pool for source: " + source + " with 
name: " + name + ". [poolSize=" + corePoolSize
+                    + ", maxPoolSize=" + maxPoolSize + ", maxQueueSize=" + 
maxQueueSize + "] -> " + answer);
+        }
+        return answer;
+    }
+
     public ExecutorService newThreadPool(Object source, String name, int 
corePoolSize, int maxPoolSize, long keepAliveTime,
                                          TimeUnit timeUnit, int maxQueueSize, 
RejectedExecutionHandler rejectedExecutionHandler,
                                          boolean daemon) {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=1034962&r1=1034961&r2=1034962&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
 Sun Nov 14 09:20:10 2010
@@ -194,6 +194,15 @@ public interface ExecutorServiceStrategy
     ExecutorService newSingleThreadExecutor(Object source, String name);
 
     /**
+     * Creates a new synchronous thread pool, which executes the task in the 
caller thread (no task queue).
+     *
+     * @param source      the source object, usually it should be 
<tt>this</tt> passed in as parameter
+     * @param name        name which is appended to the thread name
+     * @return the created thread pool
+     */
+    ExecutorService newSynchronousThreadPool(Object source, String name);
+
+    /**
      * Creates a new custom thread pool.
      * <p/>
      * Will by default use 60 seconds for keep alive time for idle threads.
@@ -209,6 +218,21 @@ public interface ExecutorServiceStrategy
 
     /**
      * Creates a new custom thread pool.
+     * <p/>
+     * Will by default use 60 seconds for keep alive time for idle threads.
+     * And use {...@link 
java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy CallerRunsPolicy} as 
rejection handler
+     *
+     * @param source        the source object, usually it should be 
<tt>this</tt> passed in as parameter
+     * @param name          name which is appended to the thread name
+     * @param corePoolSize  the core pool size
+     * @param maxPoolSize   the maximum pool size
+     * @param maxQueueSize  the maximum number of tasks in the queue, use 
<tt>Integer.MAX_INT</tt> or <tt>-1</tt> to indicate unbounded
+     * @return the created thread pool
+     */
+    ExecutorService newThreadPool(Object source, String name, int 
corePoolSize, int maxPoolSize, int maxQueueSize);
+
+    /**
+     * Creates a new custom thread pool.
      *
      * @param source                     the source object, usually it should 
be <tt>this</tt> passed in as parameter
      * @param name                       name which is appended to the thread 
name

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1034962&r1=1034961&r2=1034962&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
 Sun Nov 14 09:20:10 2010
@@ -112,7 +112,9 @@ public final class ExecutorServiceHelper
     }
 
     /**
-     * Creates a new fixed thread pool
+     * Creates a new fixed thread pool.
+     * <p/>
+     * Beware that the task queue is unbounded
      *
      * @param poolSize the fixed pool size
      * @param pattern  pattern of the thread name
@@ -169,6 +171,20 @@ public final class ExecutorServiceHelper
     }
 
     /**
+     * Creates a new synchronous thread pool which executes the task in the 
caller thread (no task queue)
+     * <p/>
+     * Uses a {...@link java.util.concurrent.SynchronousQueue} queue as task 
queue.
+     *
+     * @param pattern      pattern of the thread name
+     * @param name         ${name} in the pattern name
+     * @return the created pool
+     */
+    public static ExecutorService newSynchronousThreadPool(final String 
pattern, final String name) {
+        return ExecutorServiceHelper.newThreadPool(pattern, name, 0, 0, 60,
+                TimeUnit.SECONDS, 0, new 
ThreadPoolExecutor.CallerRunsPolicy(), true);
+    }
+
+    /**
      * Creates a new custom thread pool using 60 seconds as keep alive and 
with an unbounded queue.
      *
      * @param pattern      pattern of the thread name
@@ -183,6 +199,21 @@ public final class ExecutorServiceHelper
     }
 
     /**
+     * Creates a new custom thread pool using 60 seconds as keep alive and 
with bounded queue.
+     *
+     * @param pattern      pattern of the thread name
+     * @param name         ${name} in the pattern name
+     * @param corePoolSize the core size
+     * @param maxPoolSize  the maximum pool size
+     * @param maxQueueSize the maximum number of tasks in the queue, use 
<tt>Integer.MAX_VALUE</tt> or <tt>-1</tt> to indicate unbounded
+     * @return the created pool
+     */
+    public static ExecutorService newThreadPool(final String pattern, final 
String name, int corePoolSize, int maxPoolSize, int maxQueueSize) {
+        return ExecutorServiceHelper.newThreadPool(pattern, name, 
corePoolSize, maxPoolSize, 60,
+                TimeUnit.SECONDS, maxQueueSize, new 
ThreadPoolExecutor.CallerRunsPolicy(), true);
+    }
+
+    /**
      * Creates a new custom thread pool
      *
      * @param pattern                  pattern of the thread name
@@ -209,8 +240,11 @@ public final class ExecutorServiceHelper
 
         BlockingQueue<Runnable> queue;
         if (corePoolSize == 0 && maxQueueSize <= 0) {
-            // use a synchronous so we can act like the cached thread pool
+            // use a synchronous queue
             queue = new SynchronousQueue<Runnable>();
+            // and force 1 as pool size to be able to create the thread pool 
by the JDK
+            corePoolSize = 1;
+            maxPoolSize = 1;
         } else if (maxQueueSize <= 0) {
             // unbounded task queue
             queue = new LinkedBlockingQueue<Runnable>();

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java?rev=1034962&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/ExecutorServiceHelperTest.java
 Sun Nov 14 09:20:10 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+/**
+ * @version $Revision$
+ */
+public class ExecutorServiceHelperTest extends TestCase {
+
+    public void testGetThreadName() {
+        String name = ExecutorServiceHelper.getThreadName("Camel Thread 
${counter} - ${name}", "foo");
+
+        assertTrue(name.startsWith("Camel Thread"));
+        assertTrue(name.endsWith("foo"));
+    }
+
+    public void testNewScheduledThreadPool() {
+        ScheduledExecutorService pool = 
ExecutorServiceHelper.newScheduledThreadPool(1, "MyPool ${name}", "foo", true);
+        assertNotNull(pool);
+    }
+
+    public void testNewThreadPool() {
+        ExecutorService pool = ExecutorServiceHelper.newThreadPool("MyPool 
${name}", "foo", 1, 1);
+        assertNotNull(pool);
+    }
+
+    public void testNewThreadPool2() {
+        ExecutorService pool = ExecutorServiceHelper.newThreadPool("MyPool 
${name}", "foo", 1, 1, 20);
+        assertNotNull(pool);
+    }
+
+    public void testNewThreadPool3() {
+        ExecutorService pool = ExecutorServiceHelper.newThreadPool("MyPool 
${name}", "foo", 1, 1,
+                30, TimeUnit.SECONDS, 20, null, true);
+        assertNotNull(pool);
+    }
+
+    public void testNewCachedThreadPool() {
+        ExecutorService pool = 
ExecutorServiceHelper.newCachedThreadPool("MyPool ${name}", "foo", true);
+        assertNotNull(pool);
+    }
+
+    public void testNewFixedThreadPool() {
+        ExecutorService pool = ExecutorServiceHelper.newFixedThreadPool(1, 
"MyPool ${name}", "foo", true);
+        assertNotNull(pool);
+    }
+
+    public void testNewSynchronousThreadPool() {
+        ExecutorService pool = 
ExecutorServiceHelper.newSynchronousThreadPool("MyPool ${name}", "foo");
+        assertNotNull(pool);
+    }
+
+    public void testNewSingleThreadPool() {
+        ExecutorService pool = 
ExecutorServiceHelper.newSingleThreadExecutor("MyPool ${name}", "foo", true);
+        assertNotNull(pool);
+    }
+
+}


Reply via email to