Author: fhanik
Date: Tue Dec  9 12:56:59 2008
New Revision: 724886

URL: http://svn.apache.org/viewvc?rev=724886&view=rev
Log:
Refactored the thread pooling when using an executor, this gets rid of 
duplicate code in the NIO connector as well as in the 
org.apache.catalina.core.StandardThreadExecutor package.
I provided a ThreadPoolExecutor with a small extension to the 
java.util.concurrent
The connector method setExecutor still take a java.util.concurrent.Executor as 
an argument to provide the most flexibility

Added:
    tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java   (with 
props)
    tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java   
(with props)
    tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java   
(with props)
Modified:
    tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java

Modified: tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java?rev=724886&r1=724885&r2=724886&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java Tue 
Dec  9 12:56:59 2008
@@ -17,18 +17,16 @@
 
 package org.apache.catalina.core;
 
-import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.catalina.Executor;
 import org.apache.catalina.LifecycleException;
 import org.apache.catalina.LifecycleListener;
 import org.apache.catalina.util.LifecycleSupport;
-import java.util.concurrent.RejectedExecutionException;
+import org.apache.tomcat.util.threads.TaskQueue;
+import org.apache.tomcat.util.threads.TaskThreadFactory;
+import org.apache.tomcat.util.threads.ThreadPoolExecutor;
 
 public class StandardThreadExecutor implements Executor {
     
@@ -90,7 +88,7 @@
     public void start() throws LifecycleException {
         lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, null);
         TaskQueue taskqueue = new TaskQueue(maxQueueSize);
-        TaskThreadFactory tf = new TaskThreadFactory(namePrefix);
+        TaskThreadFactory tf = new 
TaskThreadFactory(namePrefix,daemon,getThreadPriority());
         lifecycle.fireLifecycleEvent(START_EVENT, null);
         executor = new ThreadPoolExecutor(getMinSpareThreads(), 
getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
         taskqueue.setParent( (ThreadPoolExecutor) executor);
@@ -107,17 +105,10 @@
     
     public void execute(Runnable command, long timeout, TimeUnit unit) {
         if ( executor != null ) {
-            try {
-                executor.execute(command);
-            } catch (RejectedExecutionException rx) {
-                //there could have been contention around the queue
-                try {
-                    if ( !( (TaskQueue) 
executor.getQueue()).force(command,timeout,unit) ) throw new 
RejectedExecutionException("Work queue full.");
-                }catch (InterruptedException x) {
-                    throw new RejectedExecutionException("Interrupted.",x);
-                }
-            }
-        } else throw new IllegalStateException("StandardThreadPool not 
started.");
+            executor.execute(command,timeout,unit);
+        } else { 
+            throw new IllegalStateException("StandardThreadExecutor not 
started.");
+        }
     }
     
     
@@ -258,71 +249,4 @@
     public int getQueueSize() {
         return (executor != null) ? executor.getQueue().size() : -1;
     }
-
-    // ---------------------------------------------- TaskQueue Inner Class
-    class TaskQueue extends LinkedBlockingQueue<Runnable> {
-        ThreadPoolExecutor parent = null;
-
-        public TaskQueue() {
-            super();
-        }
-
-        public TaskQueue(int capacity) {
-            super(capacity);
-        }
-
-        public TaskQueue(Collection<? extends Runnable> c) {
-            super(c);
-        }
-
-        public void setParent(ThreadPoolExecutor tp) {
-            parent = tp;
-        }
-        
-        public boolean force(Runnable o) {
-            if ( parent.isShutdown() ) throw new 
RejectedExecutionException("Executor not running, can't force a command into 
the queue");
-            return super.offer(o); //forces the item onto the queue, to be 
used if the task is rejected
-        }
-
-        public boolean force(Runnable o, long timeout, TimeUnit unit) throws 
InterruptedException {
-            if ( parent.isShutdown() ) throw new 
RejectedExecutionException("Executor not running, can't force a command into 
the queue");
-            return super.offer(o,timeout,unit); //forces the item onto the 
queue, to be used if the task is rejected
-        }
-
-        public boolean offer(Runnable o) {
-            //we can't do any checks
-            if (parent==null) return super.offer(o);
-            //we are maxed out on threads, simply queue the object
-            if (parent.getPoolSize() == parent.getMaximumPoolSize()) return 
super.offer(o);
-            //we have idle threads, just add it to the queue
-            //this is an approximation, so it could use some tuning
-            if (parent.getActiveCount()<(parent.getPoolSize())) return 
super.offer(o);
-            //if we have less threads than maximum force creation of a new 
thread
-            if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
-            //if we reached here, we need to add it to the queue
-            return super.offer(o);
-        }
-    }
-
-    // ---------------------------------------------- ThreadFactory Inner Class
-    class TaskThreadFactory implements ThreadFactory {
-        final ThreadGroup group;
-        final AtomicInteger threadNumber = new AtomicInteger(1);
-        final String namePrefix;
-
-        TaskThreadFactory(String namePrefix) {
-            SecurityManager s = System.getSecurityManager();
-            group = (s != null) ? s.getThreadGroup() : 
Thread.currentThread().getThreadGroup();
-            this.namePrefix = namePrefix;
-        }
-
-        public Thread newThread(Runnable r) {
-            Thread t = new Thread(group, r, namePrefix + 
threadNumber.getAndIncrement());
-            t.setDaemon(daemon);
-            t.setPriority(getThreadPriority());
-            return t;
-        }
-    }
-
-
 }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=724886&r1=724885&r2=724886&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Tue Dec  9 
12:56:59 2008
@@ -60,6 +60,8 @@
 import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
 import org.apache.tomcat.util.net.jsse.NioX509KeyManager;
 import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.TaskQueue;
+import org.apache.tomcat.util.threads.TaskThreadFactory;
 
 /**
  * NIO tailored thread pool, providing the following services:
@@ -339,8 +341,15 @@
      * External Executor based thread pool.
      */
     protected Executor executor = null;
-    public void setExecutor(Executor executor) { this.executor = executor; }
+    public void setExecutor(Executor executor) { 
+        this.executor = executor;
+        this.internalExecutor = (executor==null);
+    }
     public Executor getExecutor() { return executor; }
+    /**
+     * Are we using an internal executor
+     */
+    protected boolean internalExecutor = true;
     
     protected boolean useExecutor = true;
     /**
@@ -356,12 +365,8 @@
     protected int maxThreads = 200;
     public void setMaxThreads(int maxThreads) {
         this.maxThreads = maxThreads;
-        if (running) {
-            if (executor!=null) {
-                if (executor instanceof ThreadPoolExecutor) {
-                    
((ThreadPoolExecutor)executor).setMaximumPoolSize(maxThreads);
-                }
-            }
+        if (running && executor!=null && executor instanceof 
ThreadPoolExecutor) {
+            ((ThreadPoolExecutor)executor).setMaximumPoolSize(maxThreads);
         }
     }
     public int getMaxThreads() { return maxThreads; }
@@ -872,10 +877,11 @@
             
             // Create worker collection
             if ( executor == null ) {
+                internalExecutor = true;
                 TaskQueue taskqueue = new TaskQueue();
-                TaskThreadFactory tf = new TaskThreadFactory(getName() + 
"-exec-");
+                TaskThreadFactory tf = new TaskThreadFactory(getName() + 
"-exec-", daemon, getThreadPriority());
                 executor = new ThreadPoolExecutor(getMinSpareThreads(), 
getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
-                taskqueue.setParent( (ThreadPoolExecutor) executor, this);
+                taskqueue.setParent( (ThreadPoolExecutor) executor);
             }
 
             // Start poller threads
@@ -938,13 +944,13 @@
         keyCache.clear();
         nioChannels.clear();
         processorCache.clear();
-        if ( executor!=null ) {
+        if ( executor!=null && internalExecutor ) {
             if ( executor instanceof ThreadPoolExecutor ) {
                 //this is our internal one, so we need to shut it down
                 ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
                 tpe.shutdown();
                 TaskQueue queue = (TaskQueue) tpe.getQueue();
-                queue.setParent(null,null);
+                queue.setParent(null);
             }
             executor = null;
         }
@@ -1955,68 +1961,8 @@
         }
 
     }
-    
-    // ---------------------------------------------- TaskQueue Inner Class
-    public static class TaskQueue extends LinkedBlockingQueue<Runnable> {
-        ThreadPoolExecutor parent = null;
-        NioEndpoint endpoint = null;
-        
-        public TaskQueue() {
-            super();
-        }
 
-        public TaskQueue(int initialCapacity) {
-            super(initialCapacity);
-        }
- 
-        public TaskQueue(Collection<? extends Runnable> c) {
-            super(c);
-        }
-
-        
-        public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) {
-            parent = tp;
-            this.endpoint = ep;
-        }
-        
-        public boolean offer(Runnable o) {
-            //we can't do any checks
-            if (parent==null) return super.offer(o);
-            //we are maxed out on threads, simply queue the object
-            if (parent.getPoolSize() == parent.getMaximumPoolSize()) return 
super.offer(o);
-            //we have idle threads, just add it to the queue
-            //this is an approximation, so it could use some tuning
-            if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize())) 
return super.offer(o);
-            //if we have less threads than maximum force creation of a new 
thread
-            if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
-            //if we reached here, we need to add it to the queue
-            return super.offer(o);
-        }
-    }
-
-    // ---------------------------------------------- ThreadFactory Inner Class
-    class TaskThreadFactory implements ThreadFactory {
-        final ThreadGroup group;
-        final AtomicInteger threadNumber = new AtomicInteger(1);
-        final String namePrefix;
-
-        TaskThreadFactory(String namePrefix) {
-            SecurityManager s = System.getSecurityManager();
-            group = (s != null)? s.getThreadGroup() : 
Thread.currentThread().getThreadGroup();
-            this.namePrefix = namePrefix;
-        }
-
-        public Thread newThread(Runnable r) {
-            Thread t = new Thread(group, r, namePrefix + 
threadNumber.getAndIncrement());
-            t.setDaemon(daemon);
-            t.setPriority(getThreadPriority());
-            return t;
-        }
-    }
-    
     // ----------------------------------------------- SendfileData Inner Class
-
-
     /**
      * SendfileData class.
      */
@@ -2029,5 +1975,4 @@
         // KeepAlive flag
         public boolean keepAlive;
     }
-
 }

Added: tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java?rev=724886&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java Tue Dec  9 
12:56:59 2008
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.threads;
+
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+/**
+ * As task queue specifically designed to run with a thread pool executor.
+ * The task queue is optimised to properly utilize threads within 
+ * a thread pool executor. If you use a normal queue, the executor will spawn 
threads
+ * when there are idle threads and you wont be able to force items unto the 
queue itself 
+ * @author fhanik
+ *
+ */
+public class TaskQueue extends LinkedBlockingQueue<Runnable> {
+    ThreadPoolExecutor parent = null;
+
+    public TaskQueue() {
+        super();
+    }
+
+    public TaskQueue(int capacity) {
+        super(capacity);
+    }
+
+    public TaskQueue(Collection<? extends Runnable> c) {
+        super(c);
+    }
+
+    public void setParent(ThreadPoolExecutor tp) {
+        parent = tp;
+    }
+    
+    public boolean force(Runnable o) {
+        if ( parent.isShutdown() ) throw new 
RejectedExecutionException("Executor not running, can't force a command into 
the queue");
+        return super.offer(o); //forces the item onto the queue, to be used if 
the task is rejected
+    }
+
+    public boolean force(Runnable o, long timeout, TimeUnit unit) throws 
InterruptedException {
+        if ( parent.isShutdown() ) throw new 
RejectedExecutionException("Executor not running, can't force a command into 
the queue");
+        return super.offer(o,timeout,unit); //forces the item onto the queue, 
to be used if the task is rejected
+    }
+
+    public boolean offer(Runnable o) {
+        //we can't do any checks
+        if (parent==null) return super.offer(o);
+        //we are maxed out on threads, simply queue the object
+        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return 
super.offer(o);
+        //we have idle threads, just add it to the queue
+        if (parent.getActiveCount()<(parent.getPoolSize())) return 
super.offer(o);
+        //if we have less threads than maximum force creation of a new thread
+        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
+        //if we reached here, we need to add it to the queue
+        return super.offer(o);
+    }
+}

Propchange: tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java?rev=724886&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java 
(added)
+++ tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java Tue 
Dec  9 12:56:59 2008
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.threads;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * Simple task thread factory to use to create threads for an executor 
implementation.
+ * @author fhanik
+ *
+ */
+public class TaskThreadFactory implements ThreadFactory {
+    final ThreadGroup group;
+    final AtomicInteger threadNumber = new AtomicInteger(1);
+    final String namePrefix;
+    final boolean daemon;
+    final int threadPriority;
+    public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
+        SecurityManager s = System.getSecurityManager();
+        group = (s != null) ? s.getThreadGroup() : 
Thread.currentThread().getThreadGroup();
+        this.namePrefix = namePrefix;
+        this.daemon = daemon;
+        this.threadPriority = priority;
+    }
+
+    public Thread newThread(Runnable r) {
+        Thread t = new Thread(group, r, namePrefix + 
threadNumber.getAndIncrement());
+        t.setDaemon(daemon);
+        t.setPriority(threadPriority);
+        return t;
+    }
+
+}

Propchange: 
tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java?rev=724886&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java 
(added)
+++ tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java 
Tue Dec  9 12:56:59 2008
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.threads;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much 
more efficient
+ * getActiveCount method, to be used to properly handle the work queue
+ * @author fhanik
+ *
+ */
+public class ThreadPoolExecutor extends 
java.util.concurrent.ThreadPoolExecutor {
+    
+    final AtomicInteger activeCount = new AtomicInteger(0);
+    
+    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long 
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, 
RejectedExecutionHandler handler) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
handler);
+    }
+
+    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long 
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory 
threadFactory,
+            RejectedExecutionHandler handler) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory, handler);
+    }
+
+    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long 
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory 
threadFactory) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
+    }
+
+    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long 
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+    }
+
+    @Override
+    protected void afterExecute(Runnable r, Throwable t) {
+        activeCount.decrementAndGet();
+    }
+
+    @Override
+    protected void beforeExecute(Thread t, Runnable r) {
+        activeCount.incrementAndGet();
+    }
+
+    @Override
+    public int getActiveCount() {
+        return activeCount.get();
+    }
+    
+    public void execute(Runnable command, long timeout, TimeUnit unit) {
+        
+    }
+    
+    
+
+    
+    
+}

Propchange: 
tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to