Few questions:

- why is this needed ( i.e. what problem with the ThreadPoolExecutor is it
solving ) ?

- who is cleaning worker threads ( after a peak ) ?

- it would be good to have some comments on the stack - what happens on
push() if end == workers.length for
example, or why this won't happen

- try/catch may be good in worker.run, or you may miss recycle and cleanup

- why not just add executor interface to the existing ( and relatively well
tested ) thread pool ??

- hooks in TPExecutor are nice and may be useful...

I think it would be ok to add this to sandbox for example, but not very sure
about adding it to the main tree.
Doesn't look very solid....


Costin

On 5/30/06, [EMAIL PROTECTED] <[EMAIL PROTECTED]> wrote:

Author: remm
Date: Tue May 30 02:58:41 2006
New Revision: 410234

URL: http://svn.apache.org/viewvc?rev=410234&view=rev
Log:
- Add a brain dead executor.
- Submitted by Vincenc Beltran Querol.

Added:
    
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
(with props)

Added:
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java?rev=410234&view=auto

==============================================================================
---
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
(added)
+++
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
Tue May 30 02:58:41 2006
@@ -0,0 +1,312 @@
+/*
+ *  Copyright 1999-2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.tomcat.util.net;
+
+import java.util.concurrent.Executor;
+
+public class SimpleThreadPoolExecutor implements Executor {
+
+
+    private boolean running = true;
+
+    /**
+     * Available workers.
+     */
+    protected WorkerStack workers = null;
+
+    /**
+     * Current worker threads busy count.
+     */
+    protected int curThreadsBusy = 0;
+
+
+    /**
+     * Current worker threads count.
+     */
+    protected int curThreads = 0;
+
+
+    /**
+     * Sequence number used to generate thread names.
+     */
+    protected int sequence = 0;
+
+
+    /**
+     * Maximum amount of worker threads.
+     */
+    protected int maxThreads = 40;
+    public void setMaxThreads(int maxThreads) { this.maxThreads =
maxThreads; }
+    public int getMaxThreads() { return maxThreads; }
+
+    /**
+     * Name of the thread pool, which will be used for naming child
threads.
+     */
+    protected String name = "TP";
+    public void setName(String name) { this.name = name; }
+    public String getName() { return name; }
+
+    public int getCurrentThreadCount() {
+        return curThreads;
+    }
+
+    public int getCurrentThreadsBusy() {
+        return curThreads - workers.size();
+    }
+
+    public SimpleThreadPoolExecutor(String name, int maxThreads) {
+        this.name = name;
+        this.maxThreads = maxThreads;
+        workers = new WorkerStack(maxThreads);
+    }
+
+    public void execute(Runnable job){
+        getWorkerThread().assign(job);
+    }
+
+    // ----------------------------------------------------- Worker Inner
Class
+
+
+    protected class Worker implements Runnable {
+
+        protected Thread thread = null;
+        protected boolean available = false;
+        protected Runnable job = null;
+
+
+        /**
+         * Process an incoming TCP/IP connection on the specified
socket.  Any
+         * exception that occurs during processing must be logged and
swallowed.
+         * <b>NOTE</b>:  This method is called from our Connector's
thread.  We
+         * must assign it to our own thread so that multiple simultaneous
+         * requests can be handled.
+         *
+         * @param socket TCP socket to process
+         */
+        synchronized void assign(Runnable job) {
+
+            // Wait for the Processor to get the previous Socket
+            while (available) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                }
+            }
+
+            // Store the newly available Socket and notify our thread
+            this.job = job;
+            available = true;
+            notifyAll();
+
+        }
+
+
+        /**
+         * Await a newly assigned Socket from our Connector, or
<code>null</code>
+         * if we are supposed to shut down.
+         */
+        private synchronized Runnable await() {
+
+            // Wait for the Connector to provide a new Socket
+            while (!available) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                }
+            }
+
+            // Notify the Connector that we have received this Socket
+            Runnable job = this.job;
+            available = false;
+            notifyAll();
+
+            return (job);
+
+        }
+
+        public void shutdown(){
+            running = false;
+        }
+
+        /**
+         * The background thread that listens for incoming TCP/IP
connections and
+         * hands them off to an appropriate processor.
+         */
+        public void run() {
+
+            // Process requests until we receive a shutdown signal
+            while (running) {
+
+                // Wait for the next socket to be assigned
+                Runnable job = await();
+                if (job == null)
+                    continue;
+
+                job.run();
+
+                job = null;
+
+                recycleWorkerThread(this);
+            }
+
+        }
+
+
+        /**
+         * Start the background processing thread.
+         */
+        public void start() {
+            thread = new Thread(this);
+            thread.setName(getName() + "-" + (++curThreads));
+            thread.setDaemon(true);
+            thread.start();
+        }
+
+
+    }
+
+
+    /**
+     * Create (or allocate) and return an available processor for use in
+     * processing a specific HTTP request, if possible.  If the maximum
+     * allowed processors have already been created and are in use,
return
+     * <code>null</code> instead.
+     */
+    protected Worker createWorkerThread() {
+
+        synchronized (workers) {
+            if (workers.size() > 0) {
+                curThreadsBusy++;
+                return workers.pop();
+            }
+            if ((maxThreads > 0) && (curThreads < maxThreads)) {
+                curThreadsBusy++;
+                return (newWorkerThread());
+            } else {
+                if (maxThreads < 0) {
+                    curThreadsBusy++;
+                    return (newWorkerThread());
+                } else {
+                    return (null);
+                }
+            }
+        }
+
+    }
+
+    /**
+     * Create and return a new processor suitable for processing HTTP
+     * requests and returning the corresponding responses.
+     */
+    protected Worker newWorkerThread() {
+
+        Worker workerThread = new Worker();
+        workerThread.start();
+        return (workerThread);
+
+    }
+
+    /**
+     * Return a new worker thread, and block while to worker is
available.
+     */
+    protected Worker getWorkerThread() {
+        // Allocate a new worker thread
+        Worker workerThread = createWorkerThread();
+        while (workerThread == null) {
+            try {
+                synchronized (workers) {
+                    workers.wait();
+                }
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+            workerThread = createWorkerThread();
+        }
+        return workerThread;
+    }
+
+
+    /**
+     * Recycle the specified Processor so that it can be used again.
+     *
+     * @param workerThread The processor to be recycled
+     */
+    protected void recycleWorkerThread(Worker workerThread) {
+        synchronized (workers) {
+            workers.push(workerThread);
+            curThreadsBusy--;
+            workers.notify();
+        }
+    }
+
+    // ------------------------------------------------- WorkerStack
Inner Class
+
+
+    public class WorkerStack {
+
+        protected Worker[] workers = null;
+        protected int end = 0;
+
+        public WorkerStack(int size) {
+            workers = new Worker[size];
+        }
+
+        /**
+         * Put the object into the queue.
+         *
+         * @param   object      the object to be appended to the queue
(first element).
+         */
+        public void push(Worker worker) {
+            workers[end++] = worker;
+        }
+
+        /**
+         * Get the first object out of the queue. Return null if the
queue
+         * is empty.
+         */
+        public Worker pop() {
+            if (end > 0) {
+                return workers[--end];
+            }
+            return null;
+        }
+
+        /**
+         * Get the first object out of the queue, Return null if the
queue
+         * is empty.
+         */
+        public Worker peek() {
+            return workers[end];
+        }
+
+        /**
+         * Is the queue empty?
+         */
+        public boolean isEmpty() {
+            return (end == 0);
+        }
+
+        /**
+         * How many elements are there in this queue?
+         */
+        public int size() {
+            return (end);
+        }
+    }
+
+
+}

Propchange:
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java

------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]


Reply via email to