Few questions: - why is this needed ( i.e. what problem with the ThreadPoolExecutor is it solving ) ?
- who is cleaning worker threads ( after a peak ) ? - it would be good to have some comments on the stack - what happens on push() if end == workers.length for example, or why this won't happen - try/catch may be good in worker.run, or you may miss recycle and cleanup - why not just add executor interface to the existing ( and relatively well tested ) thread pool ?? - hooks in TPExecutor are nice and may be useful... I think it would be ok to add this to sandbox for example, but not very sure about adding it to the main tree. Doesn't look very solid.... Costin On 5/30/06, [EMAIL PROTECTED] <[EMAIL PROTECTED]> wrote:
Author: remm Date: Tue May 30 02:58:41 2006 New Revision: 410234 URL: http://svn.apache.org/viewvc?rev=410234&view=rev Log: - Add a brain dead executor. - Submitted by Vincenc Beltran Querol. Added: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java (with props) Added: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java?rev=410234&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java Tue May 30 02:58:41 2006 @@ -0,0 +1,312 @@ +/* + * Copyright 1999-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tomcat.util.net; + +import java.util.concurrent.Executor; + +public class SimpleThreadPoolExecutor implements Executor { + + + private boolean running = true; + + /** + * Available workers. + */ + protected WorkerStack workers = null; + + /** + * Current worker threads busy count. + */ + protected int curThreadsBusy = 0; + + + /** + * Current worker threads count. + */ + protected int curThreads = 0; + + + /** + * Sequence number used to generate thread names. + */ + protected int sequence = 0; + + + /** + * Maximum amount of worker threads. + */ + protected int maxThreads = 40; + public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } + public int getMaxThreads() { return maxThreads; } + + /** + * Name of the thread pool, which will be used for naming child threads. + */ + protected String name = "TP"; + public void setName(String name) { this.name = name; } + public String getName() { return name; } + + public int getCurrentThreadCount() { + return curThreads; + } + + public int getCurrentThreadsBusy() { + return curThreads - workers.size(); + } + + public SimpleThreadPoolExecutor(String name, int maxThreads) { + this.name = name; + this.maxThreads = maxThreads; + workers = new WorkerStack(maxThreads); + } + + public void execute(Runnable job){ + getWorkerThread().assign(job); + } + + // ----------------------------------------------------- Worker Inner Class + + + protected class Worker implements Runnable { + + protected Thread thread = null; + protected boolean available = false; + protected Runnable job = null; + + + /** + * Process an incoming TCP/IP connection on the specified socket. Any + * exception that occurs during processing must be logged and swallowed. + * <b>NOTE</b>: This method is called from our Connector's thread. We + * must assign it to our own thread so that multiple simultaneous + * requests can be handled. + * + * @param socket TCP socket to process + */ + synchronized void assign(Runnable job) { + + // Wait for the Processor to get the previous Socket + while (available) { + try { + wait(); + } catch (InterruptedException e) { + } + } + + // Store the newly available Socket and notify our thread + this.job = job; + available = true; + notifyAll(); + + } + + + /** + * Await a newly assigned Socket from our Connector, or <code>null</code> + * if we are supposed to shut down. + */ + private synchronized Runnable await() { + + // Wait for the Connector to provide a new Socket + while (!available) { + try { + wait(); + } catch (InterruptedException e) { + } + } + + // Notify the Connector that we have received this Socket + Runnable job = this.job; + available = false; + notifyAll(); + + return (job); + + } + + public void shutdown(){ + running = false; + } + + /** + * The background thread that listens for incoming TCP/IP connections and + * hands them off to an appropriate processor. + */ + public void run() { + + // Process requests until we receive a shutdown signal + while (running) { + + // Wait for the next socket to be assigned + Runnable job = await(); + if (job == null) + continue; + + job.run(); + + job = null; + + recycleWorkerThread(this); + } + + } + + + /** + * Start the background processing thread. + */ + public void start() { + thread = new Thread(this); + thread.setName(getName() + "-" + (++curThreads)); + thread.setDaemon(true); + thread.start(); + } + + + } + + + /** + * Create (or allocate) and return an available processor for use in + * processing a specific HTTP request, if possible. If the maximum + * allowed processors have already been created and are in use, return + * <code>null</code> instead. + */ + protected Worker createWorkerThread() { + + synchronized (workers) { + if (workers.size() > 0) { + curThreadsBusy++; + return workers.pop(); + } + if ((maxThreads > 0) && (curThreads < maxThreads)) { + curThreadsBusy++; + return (newWorkerThread()); + } else { + if (maxThreads < 0) { + curThreadsBusy++; + return (newWorkerThread()); + } else { + return (null); + } + } + } + + } + + /** + * Create and return a new processor suitable for processing HTTP + * requests and returning the corresponding responses. + */ + protected Worker newWorkerThread() { + + Worker workerThread = new Worker(); + workerThread.start(); + return (workerThread); + + } + + /** + * Return a new worker thread, and block while to worker is available. + */ + protected Worker getWorkerThread() { + // Allocate a new worker thread + Worker workerThread = createWorkerThread(); + while (workerThread == null) { + try { + synchronized (workers) { + workers.wait(); + } + } catch (InterruptedException e) { + // Ignore + } + workerThread = createWorkerThread(); + } + return workerThread; + } + + + /** + * Recycle the specified Processor so that it can be used again. + * + * @param workerThread The processor to be recycled + */ + protected void recycleWorkerThread(Worker workerThread) { + synchronized (workers) { + workers.push(workerThread); + curThreadsBusy--; + workers.notify(); + } + } + + // ------------------------------------------------- WorkerStack Inner Class + + + public class WorkerStack { + + protected Worker[] workers = null; + protected int end = 0; + + public WorkerStack(int size) { + workers = new Worker[size]; + } + + /** + * Put the object into the queue. + * + * @param object the object to be appended to the queue (first element). + */ + public void push(Worker worker) { + workers[end++] = worker; + } + + /** + * Get the first object out of the queue. Return null if the queue + * is empty. + */ + public Worker pop() { + if (end > 0) { + return workers[--end]; + } + return null; + } + + /** + * Get the first object out of the queue, Return null if the queue + * is empty. + */ + public Worker peek() { + return workers[end]; + } + + /** + * Is the queue empty? + */ + public boolean isEmpty() { + return (end == 0); + } + + /** + * How many elements are there in this queue? + */ + public int size() { + return (end); + } + } + + +} Propchange: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]