Hi all,

This patch moves the StackThreadPool implementation to a new file and implements the Executor interface. Now the JIoEndpoint only uses the Executor interface, but the problem is that now getCurrentThreadCount()
and getCurrentThreadsBusy() don't work anymore. Any idea?


Regards,

- Vicenç


Index: java/org/apache/tomcat/util/net/JIoEndpoint.java
===================================================================
--- java/org/apache/tomcat/util/net/JIoEndpoint.java    (revision 405803)
+++ java/org/apache/tomcat/util/net/JIoEndpoint.java    (working copy)
@@ -81,12 +81,6 @@
 
 
     /**
-     * Available workers.
-     */
-    protected WorkerStack workers = null;
-
-
-    /**
      * Running state of the endpoint.
      */
     protected volatile boolean running = false;
@@ -105,18 +99,6 @@
 
 
     /**
-     * Current worker threads busy count.
-     */
-    protected int curThreadsBusy = 0;
-
-
-    /**
-     * Current worker threads count.
-     */
-    protected int curThreads = 0;
-
-
-    /**
      * Sequence number used to generate thread names.
      */
     protected int sequence = 0;
@@ -256,11 +238,11 @@
     }
     
     public int getCurrentThreadCount() {
-        return curThreads;
+        return 0; // XXX curThreads;
     }
     
     public int getCurrentThreadsBusy() {
-        return curThreads - workers.size();
+        return 0; // XXX curThreads - workers.size();
     }
     
 
@@ -363,115 +345,6 @@
         
     }
     
-    
-    // ----------------------------------------------------- Worker Inner Class
-
-
-    protected class Worker implements Runnable {
-
-        protected Thread thread = null;
-        protected boolean available = false;
-        protected Socket socket = null;
-
-        
-        /**
-         * Process an incoming TCP/IP connection on the specified socket.  Any
-         * exception that occurs during processing must be logged and 
swallowed.
-         * <b>NOTE</b>:  This method is called from our Connector's thread.  We
-         * must assign it to our own thread so that multiple simultaneous
-         * requests can be handled.
-         *
-         * @param socket TCP socket to process
-         */
-        synchronized void assign(Socket socket) {
-
-            // Wait for the Processor to get the previous Socket
-            while (available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Store the newly available Socket and notify our thread
-            this.socket = socket;
-            available = true;
-            notifyAll();
-
-        }
-
-        
-        /**
-         * Await a newly assigned Socket from our Connector, or 
<code>null</code>
-         * if we are supposed to shut down.
-         */
-        private synchronized Socket await() {
-
-            // Wait for the Connector to provide a new Socket
-            while (!available) {
-                try {
-                    wait();
-                } catch (InterruptedException e) {
-                }
-            }
-
-            // Notify the Connector that we have received this Socket
-            Socket socket = this.socket;
-            available = false;
-            notifyAll();
-
-            return (socket);
-
-        }
-
-
-
-        /**
-         * The background thread that listens for incoming TCP/IP connections 
and
-         * hands them off to an appropriate processor.
-         */
-        public void run() {
-
-            // Process requests until we receive a shutdown signal
-            while (running) {
-
-                // Wait for the next socket to be assigned
-                Socket socket = await();
-                if (socket == null)
-                    continue;
-
-                // Process the request from this socket
-                if (!handler.process(socket)) {
-                    // Close socket
-                    try {
-                        socket.close();
-                    } catch (IOException e) {
-                    }
-                }
-
-                // Finish up this request
-                socket = null;
-                recycleWorkerThread(this);
-
-            }
-
-        }
-
-
-        /**
-         * Start the background processing thread.
-         */
-        public void start() {
-            thread = new Thread(this);
-            thread.setName(getName() + "-" + (++curThreads));
-            thread.setDaemon(true);
-            thread.start();
-        }
-
-
-    }
-
-
     // -------------------- Public methods --------------------
 
     public void init()
@@ -515,10 +388,8 @@
             running = true;
             paused = false;
 
-            // Create worker collection
-            if (executor == null) {
-                workers = new WorkerStack(maxThreads);
-            }
+            if (executor == null)
+                executor = new SimpleThreadPoolExecutor(maxThreads);
 
             // Start acceptor threads
             for (int i = 0; i < acceptorThreadCount; i++) {
@@ -638,93 +509,12 @@
         return true;
     }
 
-    
     /**
-     * Create (or allocate) and return an available processor for use in
-     * processing a specific HTTP request, if possible.  If the maximum
-     * allowed processors have already been created and are in use, return
-     * <code>null</code> instead.
-     */
-    protected Worker createWorkerThread() {
-
-        synchronized (workers) {
-            if (workers.size() > 0) {
-                curThreadsBusy++;
-                return workers.pop();
-            }
-            if ((maxThreads > 0) && (curThreads < maxThreads)) {
-                curThreadsBusy++;
-                return (newWorkerThread());
-            } else {
-                if (maxThreads < 0) {
-                    curThreadsBusy++;
-                    return (newWorkerThread());
-                } else {
-                    return (null);
-                }
-            }
-        }
-
-    }
-
-
-    /**
-     * Create and return a new processor suitable for processing HTTP
-     * requests and returning the corresponding responses.
-     */
-    protected Worker newWorkerThread() {
-
-        Worker workerThread = new Worker();
-        workerThread.start();
-        return (workerThread);
-
-    }
-
-
-    /**
-     * Return a new worker thread, and block while to worker is available.
-     */
-    protected Worker getWorkerThread() {
-        // Allocate a new worker thread
-        Worker workerThread = createWorkerThread();
-        while (workerThread == null) {
-            try {
-                synchronized (workers) {
-                    workers.wait();
-                }
-            } catch (InterruptedException e) {
-                // Ignore
-            }
-            workerThread = createWorkerThread();
-        }
-        return workerThread;
-    }
-
-
-    /**
-     * Recycle the specified Processor so that it can be used again.
-     *
-     * @param workerThread The processor to be recycled
-     */
-    protected void recycleWorkerThread(Worker workerThread) {
-        synchronized (workers) {
-            workers.push(workerThread);
-            curThreadsBusy--;
-            workers.notify();
-        }
-    }
-
-
-    /**
      * Process given socket.
      */
     protected boolean processSocket(Socket socket) {
         try {
-            if (executor == null) {
-                getWorkerThread().assign(socket);
-            } else {
-                executor.execute(new SocketProcessor(socket));
-            }
+            executor.execute(new SocketProcessor(socket));
         } catch (Throwable t) {
             // This means we got an OOM or similar creating a thread, or that
             // the pool and its queue are full
@@ -734,60 +524,4 @@
         return true;
     }
     
-
-    // ------------------------------------------------- WorkerStack Inner 
Class
-
-
-    public class WorkerStack {
-        
-        protected Worker[] workers = null;
-        protected int end = 0;
-        
-        public WorkerStack(int size) {
-            workers = new Worker[size];
-        }
-        
-        /** 
-         * Put the object into the queue.
-         * 
-         * @param   object      the object to be appended to the queue (first 
element). 
-         */
-        public void push(Worker worker) {
-            workers[end++] = worker;
-        }
-        
-        /**
-         * Get the first object out of the queue. Return null if the queue
-         * is empty. 
-         */
-        public Worker pop() {
-            if (end > 0) {
-                return workers[--end];
-            }
-            return null;
-        }
-        
-        /**
-         * Get the first object out of the queue, Return null if the queue
-         * is empty.
-         */
-        public Worker peek() {
-            return workers[end];
-        }
-        
-        /**
-         * Is the queue empty?
-         */
-        public boolean isEmpty() {
-            return (end == 0);
-        }
-        
-        /**
-         * How many elements are there in this queue?
-         */
-        public int size() {
-            return (end);
-        }
-    }
-
 }
Index: java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
===================================================================
--- java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java       
(revision 0)
+++ java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java       
(revision 0)
@@ -0,0 +1,317 @@
+/*
+ *  Copyright 1999-2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.Executor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tomcat.util.res.StringManager;
+
+public class SimpleThreadPoolExecutor implements Executor {
+
+    private boolean running = true;
+    
+    /**
+     * Available workers.
+     */
+    protected WorkerStack workers = null;
+
+    /**
+     * Current worker threads busy count.
+     */
+    protected int curThreadsBusy = 0;
+
+
+    /**
+     * Current worker threads count.
+     */
+    protected int curThreads = 0;
+
+
+    /**
+     * Sequence number used to generate thread names.
+     */
+    protected int sequence = 0;
+
+
+    /**
+     * Maximum amount of worker threads.
+     */
+    protected int maxThreads = 40;
+    public void setMaxThreads(int maxThreads) { /* XXX */ }
+    public int getMaxThreads() { return maxThreads; }
+
+    /**
+     * Name of the thread pool, which will be used for naming child threads.
+     */
+    protected String name = "TP";
+    public void setName(String name) { this.name = name; }
+    public String getName() { return name; }
+    
+    public int getCurrentThreadCount() {
+        return curThreads;
+    }
+    
+    public int getCurrentThreadsBusy() {
+        return curThreads - workers.size();
+    }
+
+    SimpleThreadPoolExecutor(int maxThreads){
+        this.maxThreads = maxThreads;
+        workers = new WorkerStack(maxThreads);
+    }
+
+    public void execute(Runnable job){
+        getWorkerThread().assign(job);
+    }
+    
+    // ----------------------------------------------------- Worker Inner Class
+
+
+    protected class Worker implements Runnable {
+
+        protected Thread thread = null;
+        protected boolean available = false;
+        protected Runnable job = null;
+
+        
+        /**
+         * Process an incoming TCP/IP connection on the specified socket.  Any
+         * exception that occurs during processing must be logged and 
swallowed.
+         * <b>NOTE</b>:  This method is called from our Connector's thread.  We
+         * must assign it to our own thread so that multiple simultaneous
+         * requests can be handled.
+         *
+         * @param socket TCP socket to process
+         */
+        synchronized void assign(Runnable job) {
+
+            // Wait for the Processor to get the previous Socket
+            while (available) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                }
+            }
+
+            // Store the newly available Socket and notify our thread
+            this.job = job;
+            available = true;
+            notifyAll();
+
+        }
+
+        
+        /**
+         * Await a newly assigned Socket from our Connector, or 
<code>null</code>
+         * if we are supposed to shut down.
+         */
+        private synchronized Runnable await() {
+
+            // Wait for the Connector to provide a new Socket
+            while (!available) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                }
+            }
+
+            // Notify the Connector that we have received this Socket
+            Runnable job = this.job;
+            available = false;
+            notifyAll();
+
+            return (job);
+
+        }
+
+        public void shutdown(){
+            running = false;
+        }
+
+        /**
+         * The background thread that listens for incoming TCP/IP connections 
and
+         * hands them off to an appropriate processor.
+         */
+        public void run() {
+
+            // Process requests until we receive a shutdown signal
+            while (running) {
+
+                // Wait for the next socket to be assigned
+                Runnable job = await();
+                if (job == null)
+                    continue;
+
+                job.run();
+                
+                job = null;
+                
+                recycleWorkerThread(this);
+            }
+
+        }
+
+
+        /**
+         * Start the background processing thread.
+         */
+        public void start() {
+            thread = new Thread(this);
+            thread.setName(getName() + "-" + (++curThreads));
+            thread.setDaemon(true);
+            thread.start();
+        }
+
+
+    }
+
+    
+    /**
+     * Create (or allocate) and return an available processor for use in
+     * processing a specific HTTP request, if possible.  If the maximum
+     * allowed processors have already been created and are in use, return
+     * <code>null</code> instead.
+     */
+    protected Worker createWorkerThread() {
+
+        synchronized (workers) {
+            if (workers.size() > 0) {
+                curThreadsBusy++;
+                return workers.pop();
+            }
+            if ((maxThreads > 0) && (curThreads < maxThreads)) {
+                curThreadsBusy++;
+                return (newWorkerThread());
+            } else {
+                if (maxThreads < 0) {
+                    curThreadsBusy++;
+                    return (newWorkerThread());
+                } else {
+                    return (null);
+                }
+            }
+        }
+
+    }
+
+    /**
+     * Create and return a new processor suitable for processing HTTP
+     * requests and returning the corresponding responses.
+     */
+    protected Worker newWorkerThread() {
+
+        Worker workerThread = new Worker();
+        workerThread.start();
+        return (workerThread);
+
+    }
+
+    /**
+     * Return a new worker thread, and block while to worker is available.
+     */
+    protected Worker getWorkerThread() {
+        // Allocate a new worker thread
+        Worker workerThread = createWorkerThread();
+        while (workerThread == null) {
+            try {
+                synchronized (workers) {
+                    workers.wait();
+                }
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+            workerThread = createWorkerThread();
+        }
+        return workerThread;
+    }
+
+
+    /**
+     * Recycle the specified Processor so that it can be used again.
+     *
+     * @param workerThread The processor to be recycled
+     */
+    protected void recycleWorkerThread(Worker workerThread) {
+        synchronized (workers) {
+            workers.push(workerThread);
+            curThreadsBusy--;
+            workers.notify();
+        }
+    }
+
+    // ------------------------------------------------- WorkerStack Inner 
Class
+
+    public class WorkerStack {
+        
+        protected Worker[] workers = null;
+        protected int end = 0;
+        
+        public WorkerStack(int size) {
+            workers = new Worker[size];
+        }
+        
+        /** 
+         * Put the object into the queue.
+         * 
+         * @param   object      the object to be appended to the queue (first 
element). 
+         */
+        public void push(Worker worker) {
+            workers[end++] = worker;
+        }
+        
+        /**
+         * Get the first object out of the queue. Return null if the queue
+         * is empty. 
+         */
+        public Worker pop() {
+            if (end > 0) {
+                return workers[--end];
+            }
+            return null;
+        }
+        
+        /**
+         * Get the first object out of the queue, Return null if the queue
+         * is empty.
+         */
+        public Worker peek() {
+            return workers[end];
+        }
+        
+        /**
+         * Is the queue empty?
+         */
+        public boolean isEmpty() {
+            return (end == 0);
+        }
+        
+        /**
+         * How many elements are there in this queue?
+         */
+        public int size() {
+            return (end);
+        }
+    }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to