Author: fhanik
Date: Mon Aug 24 15:33:48 2009
New Revision: 807284

URL: http://svn.apache.org/viewvc?rev=807284&view=rev
Log:
First round of refactoring connectors.
Remove the worker based thread pools
Enable local or injected executors
Add in a resizable executors interface to be used in future revisions
start abstracting out and using a base class. There was one, deleted, since its 
not used anywhere


Added:
    tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java   (with 
props)
    tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java   
(with props)
Removed:
    tomcat/trunk/java/org/apache/tomcat/util/net/BaseEndpoint.java
Modified:
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java

Added: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=807284&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Mon Aug 
24 15:33:48 2009
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import org.apache.tomcat.util.res.StringManager;
+/**
+ * 
+ * @author fhanik
+ * @author Mladen Turk
+ * @author Remy Maucherat
+ */
+public abstract class AbstractEndpoint {
+    
+    // -------------------------------------------------------------- Constants
+    protected StringManager sm = 
StringManager.getManager("org.apache.tomcat.util.net.res");
+
+    /**
+     * The Request attribute key for the cipher suite.
+     */
+    public static final String CIPHER_SUITE_KEY = 
"javax.servlet.request.cipher_suite";
+
+    /**
+     * The Request attribute key for the key size.
+     */
+    public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
+
+    /**
+     * The Request attribute key for the client certificate chain.
+     */
+    public static final String CERTIFICATE_KEY = 
"javax.servlet.request.X509Certificate";
+
+    /**
+     * The Request attribute key for the session id.
+     * This one is a Tomcat extension to the Servlet spec.
+     */
+    public static final String SESSION_ID_KEY = 
"javax.servlet.request.ssl_session";
+
+    /**
+     * The request attribute key for the session manager.
+     * This one is a Tomcat extension to the Servlet spec.
+     */
+    public static final String SESSION_MGR = 
"javax.servlet.request.ssl_session_mgr";
+
+}

Propchange: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=807284&r1=807283&r2=807284&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Mon Aug 24 
15:33:48 2009
@@ -21,6 +21,8 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
@@ -37,6 +39,10 @@
 import org.apache.tomcat.jni.Socket;
 import org.apache.tomcat.jni.Status;
 import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ResizableExecutor;
+import org.apache.tomcat.util.threads.TaskQueue;
+import org.apache.tomcat.util.threads.TaskThreadFactory;
+import org.apache.tomcat.util.threads.ThreadPoolExecutor;
 
 /**
  * APR tailored thread pool, providing the following services:
@@ -53,7 +59,7 @@
  * @author Mladen Turk
  * @author Remy Maucherat
  */
-public class AprEndpoint {
+public class AprEndpoint extends AbstractEndpoint {
 
 
     // -------------------------------------------------------------- Constants
@@ -61,8 +67,6 @@
 
     protected static Log log = LogFactory.getLog(AprEndpoint.class);
 
-    protected static StringManager sm =
-        StringManager.getManager("org.apache.tomcat.util.net.res");
 
 
     /**
@@ -86,24 +90,11 @@
      */
     public static final String SESSION_ID_KEY = 
"javax.servlet.request.ssl_session";
 
-    /**
-     * The request attribute key for the session manager.
-     * This one is a Tomcat extension to the Servlet spec.
-     */
-    public static final String SESSION_MGR =
-        "javax.servlet.request.ssl_session_mgr";
-
 
     // ----------------------------------------------------------------- Fields
 
 
     /**
-     * Available workers.
-     */
-    protected WorkerStack workers = null;
-
-
-    /**
      * Running state of the endpoint.
      */
     protected volatile boolean running = false;
@@ -163,6 +154,10 @@
     protected long sslContext = 0;
 
     
+    /**
+     * Are we using an internal executor
+     */
+    protected volatile boolean internalExecutor = false;
     // ------------------------------------------------------------- Properties
 
 
@@ -188,10 +183,8 @@
     protected int maxThreads = 200;
     public void setMaxThreads(int maxThreads) {
         this.maxThreads = maxThreads;
-        if (running) {
-            synchronized(workers) {
-                workers.resize(maxThreads);
-            }
+        if (running && executor instanceof ResizableExecutor) {
+            ((ResizableExecutor)executor).resizePool(getMinSpareThreads(), 
getMaxThreads());
         }
     }
     public int getMaxThreads() { return maxThreads; }
@@ -545,9 +538,15 @@
      */
     public int getCurrentThreadCount() {
         if (executor!=null) {
-            return -1;
+            if (executor instanceof ThreadPoolExecutor) {
+                return ((ThreadPoolExecutor)executor).getPoolSize();
+            } else if (executor instanceof ResizableExecutor) {
+                return ((ResizableExecutor)executor).getPoolSize();
+            } else {
+                return -1;
+            }
         } else {
-            return curThreads;
+            return -2;
         }
     }
 
@@ -558,9 +557,15 @@
      */
     public int getCurrentThreadsBusy() {
         if (executor!=null) {
-            return -1;
+            if (executor instanceof ThreadPoolExecutor) {
+                return ((ThreadPoolExecutor)executor).getActiveCount();
+            } else if (executor instanceof ResizableExecutor) {
+                return ((ResizableExecutor)executor).getActiveCount();
+            } else {
+                return -1;
+            }
         } else {
-            return workers!=null?curThreads - workers.size():0;
+            return -2;
         }
     }
     
@@ -744,7 +749,11 @@
 
             // Create worker collection
             if (executor == null) {
-                workers = new WorkerStack(maxThreads);
+                internalExecutor = true;
+                TaskQueue taskqueue = new TaskQueue();
+                TaskThreadFactory tf = new TaskThreadFactory(getName() + 
"-exec-", daemon, getThreadPriority());
+                executor = new ThreadPoolExecutor(getMinSpareThreads(), 
getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
+                taskqueue.setParent( (ThreadPoolExecutor) executor);
             }
 
             // Start poller threads
@@ -838,6 +847,16 @@
                 sendfiles = null;
             }
         }
+        if ( executor!=null && internalExecutor ) {
+            if ( executor instanceof ThreadPoolExecutor ) {
+                //this is our internal one, so we need to shut it down
+                ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
+                tpe.shutdownNow();
+                TaskQueue queue = (TaskQueue) tpe.getQueue();
+                queue.setParent(null);
+            }
+            executor = null;
+        }
     }
 
 
@@ -946,86 +965,6 @@
     }
 
 
-    /**
-     * Create (or allocate) and return an available processor for use in
-     * processing a specific HTTP request, if possible.  If the maximum
-     * allowed processors have already been created and are in use, return
-     * <code>null</code> instead.
-     */
-    protected Worker createWorkerThread() {
-
-        synchronized (workers) {
-            if (workers.size() > 0) {
-                curThreadsBusy++;
-                return (workers.pop());
-            }
-            if ((maxThreads > 0) && (curThreads < maxThreads)) {
-                curThreadsBusy++;
-                if (curThreadsBusy == maxThreads) {
-                    log.info(sm.getString("endpoint.info.maxThreads",
-                            Integer.toString(maxThreads), address,
-                            Integer.toString(port)));
-                }
-                return (newWorkerThread());
-            } else {
-                if (maxThreads < 0) {
-                    curThreadsBusy++;
-                    return (newWorkerThread());
-                } else {
-                    return (null);
-                }
-            }
-        }
-
-    }
-
-
-    /**
-     * Create and return a new processor suitable for processing HTTP
-     * requests and returning the corresponding responses.
-     */
-    protected Worker newWorkerThread() {
-
-        Worker workerThread = new Worker();
-        workerThread.start();
-        return (workerThread);
-
-    }
-
-
-    /**
-     * Return a new worker thread, and block while to worker is available.
-     */
-    protected Worker getWorkerThread() {
-        // Allocate a new worker thread
-        Worker workerThread = createWorkerThread();
-        while (workerThread == null) {
-            try {
-                synchronized (workers) {
-                    workers.wait();
-                }
-            } catch (InterruptedException e) {
-                // Ignore
-            }
-            workerThread = createWorkerThread();
-        }
-        return workerThread;
-    }
-
-
-    /**
-     * Recycle the specified Processor so that it can be used again.
-     *
-     * @param workerThread The processor to be recycled
-     */
-    protected void recycleWorkerThread(Worker workerThread) {
-        synchronized (workers) {
-            workers.push(workerThread);
-            curThreadsBusy--;
-            workers.notify();
-        }
-    }
-
     
     /**
      * Allocate a new poller of the specified size.
@@ -1050,11 +989,10 @@
      */
     protected boolean processSocketWithOptions(long socket) {
         try {
-            if (executor == null) {
-                getWorkerThread().assignWithOptions(socket);
-            } else {
-                executor.execute(new SocketWithOptionsProcessor(socket));
-            }
+            executor.execute(new SocketWithOptionsProcessor(socket));
+        } catch (RejectedExecutionException x) {
+            log.warn("Socket processing request was rejected for:"+socket,x);
+            return false;
         } catch (Throwable t) {
             // This means we got an OOM or similar creating a thread, or that
             // the pool and its queue are full
@@ -1070,11 +1008,10 @@
      */
     protected boolean processSocket(long socket) {
         try {
-            if (executor == null) {
-                getWorkerThread().assign(socket);
-            } else {
-                executor.execute(new SocketProcessor(socket));
-            }
+            executor.execute(new SocketProcessor(socket));
+        } catch (RejectedExecutionException x) {
+            log.warn("Socket processing request was rejected for:"+socket,x);
+            return false;
         } catch (Throwable t) {
             // This means we got an OOM or similar creating a thread, or that
             // the pool and its queue are full
@@ -1090,11 +1027,10 @@
      */
     protected boolean processSocket(long socket, SocketStatus status) {
         try {
-            if (executor == null) {
-                getWorkerThread().assign(socket, status);
-            } else {
-                executor.execute(new SocketEventProcessor(socket, status));
-            }
+            executor.execute(new SocketEventProcessor(socket, status));
+        } catch (RejectedExecutionException x) {
+            log.warn("Socket processing request was rejected for:"+socket,x);
+            return false;
         } catch (Throwable t) {
             // This means we got an OOM or similar creating a thread, or that
             // the pool and its queue are full
@@ -1389,178 +1325,6 @@
     // ----------------------------------------------------- Worker Inner Class
 
 
-    /**
-     * Server processor class.
-     */
-    protected class Worker implements Runnable {
-
-
-        protected Thread thread = null;
-        protected boolean available = false;
-        protected long socket = 0;
-        protected SocketStatus status = null;
-        protected boolean options = false;
-
-
-        /**
-         * Process an incoming TCP/IP connection on the specified socket.  Any
-         * exception that occurs during processing must be logged and 
swallowed.
-         * <b>NOTE</b>:  This method is called from our Connector's thread.  We
-         * must assign it to our own thread so that multiple simultaneous
-         * requests can be handled.
-         *
-         * @param socket TCP socket to process
-         */
-        protected synchronized void assignWithOptions(long socket) {
-
-            // Wait for the Processor to get the previous Socket
-            while (available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Store the newly available Socket and notify our thread
-            this.socket = socket;
-            status = null;
-            options = true;
-            available = true;
-            notifyAll();
-
-        }
-
-
-        /**
-         * Process an incoming TCP/IP connection on the specified socket.  Any
-         * exception that occurs during processing must be logged and 
swallowed.
-         * <b>NOTE</b>:  This method is called from our Connector's thread.  We
-         * must assign it to our own thread so that multiple simultaneous
-         * requests can be handled.
-         *
-         * @param socket TCP socket to process
-         */
-        protected synchronized void assign(long socket) {
-
-            // Wait for the Processor to get the previous Socket
-            while (available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Store the newly available Socket and notify our thread
-            this.socket = socket;
-            status = null;
-            options = false;
-            available = true;
-            notifyAll();
-
-        }
-
-
-        protected synchronized void assign(long socket, SocketStatus status) {
-
-            // Wait for the Processor to get the previous Socket
-            while (available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Store the newly available Socket and notify our thread
-            this.socket = socket;
-            this.status = status;
-            options = false;
-            available = true;
-            notifyAll();
-
-        }
-
-
-        /**
-         * Await a newly assigned Socket from our Connector, or 
<code>null</code>
-         * if we are supposed to shut down.
-         */
-        protected synchronized long await() {
-
-            // Wait for the Connector to provide a new Socket
-            while (!available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Notify the Connector that we have received this Socket
-            long socket = this.socket;
-            available = false;
-            notifyAll();
-
-            return (socket);
-
-        }
-
-
-        /**
-         * The background thread that listens for incoming TCP/IP connections 
and
-         * hands them off to an appropriate processor.
-         */
-        public void run() {
-
-            // Process requests until we receive a shutdown signal
-            while (running) {
-
-                // Wait for the next socket to be assigned
-                long socket = await();
-                if (socket == 0)
-                    continue;
-
-                if (!deferAccept && options) {
-                    if (setSocketOptions(socket)) {
-                        getPoller().add(socket);
-                    } else {
-                        // Close socket and pool
-                        Socket.destroy(socket);
-                        socket = 0;
-                    }
-                } else {
-
-                    // Process the request from this socket
-                    if ((status != null) && (handler.event(socket, status) == 
Handler.SocketState.CLOSED)) {
-                        // Close socket and pool
-                        Socket.destroy(socket);
-                        socket = 0;
-                    } else if ((status == null) && ((options && 
!setSocketOptions(socket)) 
-                            || handler.process(socket) == 
Handler.SocketState.CLOSED)) {
-                        // Close socket and pool
-                        Socket.destroy(socket);
-                        socket = 0;
-                    }
-                }
-
-                // Finish up this request
-                recycleWorkerThread(this);
-
-            }
-
-        }
-
-
-        /**
-         * Start the background processing thread.
-         */
-        public void start() {
-            thread = new Thread(this);
-            thread.setName(getName() + "-" + (++curThreads));
-            thread.setDaemon(true);
-            thread.start();
-        }
-
-
-    }
 
 
     // ----------------------------------------------- SendfileData Inner Class
@@ -1887,83 +1651,6 @@
     }
 
 
-    // ------------------------------------------------- WorkerStack Inner 
Class
-
-
-    public class WorkerStack {
-        
-        protected Worker[] workers = null;
-        protected int end = 0;
-        
-        public WorkerStack(int size) {
-            workers = new Worker[size];
-        }
-        
-        /** 
-         * Put the object into the queue. If the queue is full (for example if
-         * the queue has been reduced in size) the object will be dropped.
-         * 
-         * @param   object  the object to be appended to the queue (first
-         *                  element).
-         */
-        public void push(Worker worker) {
-            if (end < workers.length) {
-                workers[end++] = worker;
-            } else {
-                curThreads--;
-            }
-        }
-        
-        /**
-         * Get the first object out of the queue. Return null if the queue
-         * is empty. 
-         */
-        public Worker pop() {
-            if (end > 0) {
-                return workers[--end];
-            }
-            return null;
-        }
-        
-        /**
-         * Get the first object out of the queue, Return null if the queue
-         * is empty.
-         */
-        public Worker peek() {
-            return workers[end];
-        }
-        
-        /**
-         * Is the queue empty?
-         */
-        public boolean isEmpty() {
-            return (end == 0);
-        }
-        
-        /**
-         * How many elements are there in this queue?
-         */
-        public int size() {
-            return (end);
-        }
-        
-        /**
-         * Resize the queue. If there are too many objects in the queue for the
-         * new size, drop the excess.
-         * 
-         * @param newSize
-         */
-        public void resize(int newSize) {
-            Worker[] newWorkers = new Worker[newSize];
-            int len = workers.length;
-            if (newSize < len) {
-                len = newSize;
-            }
-            System.arraycopy(workers, 0, newWorkers, 0, len);
-            workers = newWorkers;
-        }
-    }
-
 
     // ---------------------------------------------- SocketProcessor Inner 
Class
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java?rev=807284&r1=807283&r2=807284&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java Mon Aug 24 
15:33:48 2009
@@ -23,11 +23,17 @@
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.IntrospectionUtils;
 import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ResizableExecutor;
+import org.apache.tomcat.util.threads.TaskQueue;
+import org.apache.tomcat.util.threads.TaskThreadFactory;
+import org.apache.tomcat.util.threads.ThreadPoolExecutor;
 
 /**
  * Handle incoming TCP connections.
@@ -45,50 +51,17 @@
  * @author Yoav Shapira
  * @author Remy Maucherat
  */
-public class JIoEndpoint {
+public class JIoEndpoint extends AbstractEndpoint {
 
 
     // -------------------------------------------------------------- Constants
 
-
     protected static Log log = LogFactory.getLog(JIoEndpoint.class);
 
-    protected StringManager sm = 
-        StringManager.getManager("org.apache.tomcat.util.net.res");
-
-
-    /**
-     * The Request attribute key for the cipher suite.
-     */
-    public static final String CIPHER_SUITE_KEY = 
"javax.servlet.request.cipher_suite";
-
-    /**
-     * The Request attribute key for the key size.
-     */
-    public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
-
-    /**
-     * The Request attribute key for the client certificate chain.
-     */
-    public static final String CERTIFICATE_KEY = 
"javax.servlet.request.X509Certificate";
-
-    /**
-     * The Request attribute key for the session id.
-     * This one is a Tomcat extension to the Servlet spec.
-     */
-    public static final String SESSION_ID_KEY = 
"javax.servlet.request.ssl_session";
-
-
     // ----------------------------------------------------------------- Fields
 
 
     /**
-     * Available workers.
-     */
-    protected WorkerStack workers = null;
-
-
-    /**
      * Running state of the endpoint.
      */
     protected volatile boolean running = false;
@@ -134,6 +107,10 @@
      */
     protected SocketProperties socketProperties = new SocketProperties();
 
+    /**
+     * Are we using an internal executor
+     */
+    protected volatile boolean internalExecutor = false;
 
     // ------------------------------------------------------------- Properties
 
@@ -177,13 +154,19 @@
     public void setMaxThreads(int maxThreads) {
         this.maxThreads = maxThreads;
         if (running) {
-            synchronized(workers) {
-                workers.resize(maxThreads);
-            }
+            //TODO Dynamic resize
+            log.error("Resizing executor dynamically is not possible at this 
time.");
         }
     }
     public int getMaxThreads() { return maxThreads; }
 
+    public int minSpareThreads = 10;
+    public int getMinSpareThreads() {
+        return Math.min(minSpareThreads,getMaxThreads());
+    }
+    public void setMinSpareThreads(int minSpareThreads) {
+        this.minSpareThreads = minSpareThreads;
+    }
 
     /**
      * Priority of the acceptor and poller threads.
@@ -304,9 +287,15 @@
      */
     public int getCurrentThreadCount() {
         if (executor!=null) {
-            return -1;
+            if (executor instanceof ThreadPoolExecutor) {
+                return ((ThreadPoolExecutor)executor).getPoolSize();
+            } else if (executor instanceof ResizableExecutor) {
+                return ((ResizableExecutor)executor).getPoolSize();
+            } else {
+                return -1;
+            }
         } else {
-            return curThreads;
+            return -2;
         }
     }
 
@@ -317,9 +306,15 @@
      */
     public int getCurrentThreadsBusy() {
         if (executor!=null) {
-            return -1;
+            if (executor instanceof ThreadPoolExecutor) {
+                return ((ThreadPoolExecutor)executor).getActiveCount();
+            } else if (executor instanceof ResizableExecutor) {
+                return ((ResizableExecutor)executor).getActiveCount();
+            } else {
+                return -1;
+            }
         } else {
-            return workers!=null?curThreads - workers.size():0;
+            return -2;
         }
     }
     
@@ -426,113 +421,6 @@
     }
     
     
-    // ----------------------------------------------------- Worker Inner Class
-
-
-    protected class Worker implements Runnable {
-
-        protected Thread thread = null;
-        protected boolean available = false;
-        protected Socket socket = null;
-
-        
-        /**
-         * Process an incoming TCP/IP connection on the specified socket.  Any
-         * exception that occurs during processing must be logged and 
swallowed.
-         * <b>NOTE</b>:  This method is called from our Connector's thread.  We
-         * must assign it to our own thread so that multiple simultaneous
-         * requests can be handled.
-         *
-         * @param socket TCP socket to process
-         */
-        synchronized void assign(Socket socket) {
-
-            // Wait for the Processor to get the previous Socket
-            while (available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Store the newly available Socket and notify our thread
-            this.socket = socket;
-            available = true;
-            notifyAll();
-
-        }
-
-        
-        /**
-         * Await a newly assigned Socket from our Connector, or 
<code>null</code>
-         * if we are supposed to shut down.
-         */
-        private synchronized Socket await() {
-
-            // Wait for the Connector to provide a new Socket
-            while (!available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Notify the Connector that we have received this Socket
-            Socket socket = this.socket;
-            available = false;
-            notifyAll();
-
-            return (socket);
-
-        }
-
-
-
-        /**
-         * The background thread that listens for incoming TCP/IP connections 
and
-         * hands them off to an appropriate processor.
-         */
-        public void run() {
-
-            // Process requests until we receive a shutdown signal
-            while (running) {
-
-                // Wait for the next socket to be assigned
-                Socket socket = await();
-                if (socket == null)
-                    continue;
-
-                // Process the request from this socket
-                if (!setSocketOptions(socket) || !handler.process(socket)) {
-                    // Close socket
-                    try {
-                        socket.close();
-                    } catch (IOException e) {
-                    }
-                }
-
-                // Finish up this request
-                socket = null;
-                recycleWorkerThread(this);
-
-            }
-
-        }
-
-
-        /**
-         * Start the background processing thread.
-         */
-        public void start() {
-            thread = new Thread(this);
-            thread.setName(getName() + "-" + (++curThreads));
-            thread.setDaemon(true);
-            thread.start();
-        }
-
-
-    }
-
 
     // -------------------- Public methods --------------------
 
@@ -583,7 +471,11 @@
 
             // Create worker collection
             if (executor == null) {
-                workers = new WorkerStack(maxThreads);
+                internalExecutor = true;
+                TaskQueue taskqueue = new TaskQueue();
+                TaskThreadFactory tf = new TaskThreadFactory(getName() + 
"-exec-", daemon, getThreadPriority());
+                executor = new ThreadPoolExecutor(getMinSpareThreads(), 
getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
+                taskqueue.setParent( (ThreadPoolExecutor) executor);
             }
 
             // Start acceptor threads
@@ -614,6 +506,16 @@
             running = false;
             unlockAccept();
         }
+        if ( executor!=null && internalExecutor ) {
+            if ( executor instanceof ThreadPoolExecutor ) {
+                //this is our internal one, so we need to shut it down
+                ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
+                tpe.shutdownNow();
+                TaskQueue queue = (TaskQueue) tpe.getQueue();
+                queue.setParent(null);
+            }
+            executor = null;
+        }
     }
 
     /**
@@ -696,97 +598,16 @@
 
     
     /**
-     * Create (or allocate) and return an available processor for use in
-     * processing a specific HTTP request, if possible.  If the maximum
-     * allowed processors have already been created and are in use, return
-     * <code>null</code> instead.
-     */
-    protected Worker createWorkerThread() {
-
-        synchronized (workers) {
-            if (workers.size() > 0) {
-                curThreadsBusy++;
-                return workers.pop();
-            }
-            if ((maxThreads > 0) && (curThreads < maxThreads)) {
-                curThreadsBusy++;
-                if (curThreadsBusy == maxThreads) {
-                    log.info(sm.getString("endpoint.info.maxThreads",
-                            Integer.toString(maxThreads), address,
-                            Integer.toString(port)));
-                }
-                return (newWorkerThread());
-            } else {
-                if (maxThreads < 0) {
-                    curThreadsBusy++;
-                    return (newWorkerThread());
-                } else {
-                    return (null);
-                }
-            }
-        }
-
-    }
-
-
-    /**
-     * Create and return a new processor suitable for processing HTTP
-     * requests and returning the corresponding responses.
-     */
-    protected Worker newWorkerThread() {
-
-        Worker workerThread = new Worker();
-        workerThread.start();
-        return (workerThread);
-
-    }
-
-
-    /**
-     * Return a new worker thread, and block while to worker is available.
-     */
-    protected Worker getWorkerThread() {
-        // Allocate a new worker thread
-        Worker workerThread = createWorkerThread();
-        while (workerThread == null) {
-            try {
-                synchronized (workers) {
-                    workers.wait();
-                }
-            } catch (InterruptedException e) {
-                // Ignore
-            }
-            workerThread = createWorkerThread();
-        }
-        return workerThread;
-    }
-
-
-    /**
-     * Recycle the specified Processor so that it can be used again.
-     *
-     * @param workerThread The processor to be recycled
-     */
-    protected void recycleWorkerThread(Worker workerThread) {
-        synchronized (workers) {
-            workers.push(workerThread);
-            curThreadsBusy--;
-            workers.notify();
-        }
-    }
-
-
-    /**
      * Process given socket.
      */
     protected boolean processSocket(Socket socket) {
         try {
-            if (executor == null) {
-                getWorkerThread().assign(socket);
-            } else {
-                executor.execute(new SocketProcessor(socket));
-            }
+            executor.execute(new SocketProcessor(socket));
+        } catch (RejectedExecutionException x) {
+            log.warn("Socket processing request was rejected for:"+socket,x);
+            return false;
         } catch (Throwable t) {
+            
             // This means we got an OOM or similar creating a thread, or that
             // the pool and its queue are full
             log.error(sm.getString("endpoint.process.fail"), t);
@@ -796,81 +617,4 @@
     }
     
 
-    // ------------------------------------------------- WorkerStack Inner 
Class
-
-
-    public class WorkerStack {
-        
-        protected Worker[] workers = null;
-        protected int end = 0;
-        
-        public WorkerStack(int size) {
-            workers = new Worker[size];
-        }
-        
-        /** 
-         * Put the object into the queue. If the queue is full (for example if
-         * the queue has been reduced in size) the object will be dropped.
-         * 
-         * @param   object  the object to be appended to the queue (first
-         *                  element).
-         */
-        public void push(Worker worker) {
-            if (end < workers.length) {
-                workers[end++] = worker;
-            } else {
-                curThreads--;
-            }
-        }
-        
-        /**
-         * Get the first object out of the queue. Return null if the queue
-         * is empty. 
-         */
-        public Worker pop() {
-            if (end > 0) {
-                return workers[--end];
-            }
-            return null;
-        }
-        
-        /**
-         * Get the first object out of the queue, Return null if the queue
-         * is empty.
-         */
-        public Worker peek() {
-            return workers[end];
-        }
-        
-        /**
-         * Is the queue empty?
-         */
-        public boolean isEmpty() {
-            return (end == 0);
-        }
-        
-        /**
-         * How many elements are there in this queue?
-         */
-        public int size() {
-            return (end);
-        }
-        
-        /**
-         * Resize the queue. If there are too many objects in the queue for the
-         * new size, drop the excess.
-         * 
-         * @param newSize
-         */
-        public void resize(int newSize) {
-            Worker[] newWorkers = new Worker[newSize];
-            int len = workers.length;
-            if (newSize < len) {
-                len = newSize;
-            }
-            System.arraycopy(workers, 0, newWorkers, 0, len);
-            workers = newWorkers;
-        }
-    }
-
 }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=807284&r1=807283&r2=807284&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Aug 24 
15:33:48 2009
@@ -58,6 +58,7 @@
 import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
 import org.apache.tomcat.util.net.jsse.NioX509KeyManager;
 import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ResizableExecutor;
 import org.apache.tomcat.util.threads.TaskQueue;
 import org.apache.tomcat.util.threads.TaskThreadFactory;
 import org.apache.tomcat.util.threads.ThreadPoolExecutor;
@@ -77,7 +78,7 @@
  * @author Remy Maucherat
  * @author Filip Hanik
  */
-public class NioEndpoint {
+public class NioEndpoint extends AbstractEndpoint {
 
 
     // -------------------------------------------------------------- Constants
@@ -85,30 +86,6 @@
 
     protected static Log log = LogFactory.getLog(NioEndpoint.class);
 
-    protected static StringManager sm =
-        StringManager.getManager("org.apache.tomcat.util.net.res");
-
-
-    /**
-     * The Request attribute key for the cipher suite.
-     */
-    public static final String CIPHER_SUITE_KEY = 
"javax.servlet.request.cipher_suite";
-
-    /**
-     * The Request attribute key for the key size.
-     */
-    public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
-
-    /**
-     * The Request attribute key for the client certificate chain.
-     */
-    public static final String CERTIFICATE_KEY = 
"javax.servlet.request.X509Certificate";
-
-    /**
-     * The Request attribute key for the session id.
-     * This one is a Tomcat extension to the Servlet spec.
-     */
-    public static final String SESSION_ID_KEY = 
"javax.servlet.request.ssl_session";
 
     public static final int OP_REGISTER = 0x100; //register interest op
     public static final int OP_CALLBACK = 0x200; //callback interest op
@@ -333,7 +310,7 @@
     /**
      * Are we using an internal executor
      */
-    protected boolean internalExecutor = true;
+    protected volatile boolean internalExecutor = false;
     
     protected boolean useExecutor = true;
     /**
@@ -518,13 +495,16 @@
     /**
      * Dummy maxSpareThreads property.
      */
-    public int getMaxSpareThreads() { return Math.min(getMaxThreads(),5); }
+    public int getMaxSpareThreads() { return 
Math.min(getMaxThreads(),getMinSpareThreads()); }
 
 
-    /**
-     * Dummy minSpareThreads property.
-     */
-    public int getMinSpareThreads() { return Math.min(getMaxThreads(),5); }
+    public int minSpareThreads = 10;
+    public int getMinSpareThreads() {
+        return Math.min(minSpareThreads,getMaxThreads());
+    }
+    public void setMinSpareThreads(int minSpareThreads) {
+        this.minSpareThreads = minSpareThreads;
+    }
     
     /**
      * Generic properties, introspected
@@ -733,6 +713,8 @@
         if (executor!=null) {
             if (executor instanceof ThreadPoolExecutor) {
                 return ((ThreadPoolExecutor)executor).getPoolSize();
+            } else if (executor instanceof ResizableExecutor) {
+                return ((ResizableExecutor)executor).getPoolSize();
             } else {
                 return -1;
             }
@@ -750,6 +732,8 @@
         if (executor!=null) {
             if (executor instanceof ThreadPoolExecutor) {
                 return ((ThreadPoolExecutor)executor).getActiveCount();
+            } else if (executor instanceof ResizableExecutor) {
+                return ((ResizableExecutor)executor).getActiveCount();
             } else {
                 return -1;
             }
@@ -1142,9 +1126,7 @@
             if ( dispatch && executor!=null ) executor.execute(sc);
             else sc.run();
         } catch (RejectedExecutionException rx) {
-            if (log.isDebugEnabled()) {
-                log.debug("Unable to process socket, executor rejected the 
task.",rx);
-            }
+            log.warn("Socket processing request was rejected for:"+socket,rx);
             return false;
         } catch (Throwable t) {
             // This means we got an OOM or similar creating a thread, or that

Added: tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java?rev=807284&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java 
(added)
+++ tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java Mon 
Aug 24 15:33:48 2009
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.threads;
+
+import java.util.concurrent.Executor;
+
+public interface ResizableExecutor extends Executor {
+    /**
+     * {...@link java.util.concurrent.ThreadPoolExecutor#getPoolSize()}
+     * @return  {...@link 
java.util.concurrent.ThreadPoolExecutor#getPoolSize()}
+     */
+    public int getPoolSize();
+    
+    /**
+     * {...@link java.util.concurrent.ThreadPoolExecutor#getActiveCount()}
+     * @return {...@link 
java.util.concurrent.ThreadPoolExecutor#getActiveCount()}
+     */
+    public int getActiveCount();
+    
+    public boolean resizePool(int corePoolSize, int maximumPoolSize);
+    
+    public boolean resizeQueue(int capacity);
+    
+}

Propchange: 
tomcat/trunk/java/org/apache/tomcat/util/threads/ResizableExecutor.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to