Hi all,
This patch moves the StackThreadPool implementation to a new file and
implements the Executor interface.
Now the JIoEndpoint only uses the Executor interface, but the problem is
that now getCurrentThreadCount()
and getCurrentThreadsBusy() don't work anymore. Any idea?
Regards,
- Vicenç
Index: java/org/apache/tomcat/util/net/JIoEndpoint.java
===================================================================
--- java/org/apache/tomcat/util/net/JIoEndpoint.java (revision 405803)
+++ java/org/apache/tomcat/util/net/JIoEndpoint.java (working copy)
@@ -81,12 +81,6 @@
/**
- * Available workers.
- */
- protected WorkerStack workers = null;
-
-
- /**
* Running state of the endpoint.
*/
protected volatile boolean running = false;
@@ -105,18 +99,6 @@
/**
- * Current worker threads busy count.
- */
- protected int curThreadsBusy = 0;
-
-
- /**
- * Current worker threads count.
- */
- protected int curThreads = 0;
-
-
- /**
* Sequence number used to generate thread names.
*/
protected int sequence = 0;
@@ -256,11 +238,11 @@
}
public int getCurrentThreadCount() {
- return curThreads;
+ return 0; // XXX curThreads;
}
public int getCurrentThreadsBusy() {
- return curThreads - workers.size();
+ return 0; // XXX curThreads - workers.size();
}
@@ -363,115 +345,6 @@
}
-
- // ----------------------------------------------------- Worker Inner Class
-
-
- protected class Worker implements Runnable {
-
- protected Thread thread = null;
- protected boolean available = false;
- protected Socket socket = null;
-
-
- /**
- * Process an incoming TCP/IP connection on the specified socket. Any
- * exception that occurs during processing must be logged and
swallowed.
- * <b>NOTE</b>: This method is called from our Connector's thread. We
- * must assign it to our own thread so that multiple simultaneous
- * requests can be handled.
- *
- * @param socket TCP socket to process
- */
- synchronized void assign(Socket socket) {
-
- // Wait for the Processor to get the previous Socket
- while (available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Store the newly available Socket and notify our thread
- this.socket = socket;
- available = true;
- notifyAll();
-
- }
-
-
- /**
- * Await a newly assigned Socket from our Connector, or
<code>null</code>
- * if we are supposed to shut down.
- */
- private synchronized Socket await() {
-
- // Wait for the Connector to provide a new Socket
- while (!available) {
- try {
- wait();
- } catch (InterruptedException e) {
- }
- }
-
- // Notify the Connector that we have received this Socket
- Socket socket = this.socket;
- available = false;
- notifyAll();
-
- return (socket);
-
- }
-
-
-
- /**
- * The background thread that listens for incoming TCP/IP connections
and
- * hands them off to an appropriate processor.
- */
- public void run() {
-
- // Process requests until we receive a shutdown signal
- while (running) {
-
- // Wait for the next socket to be assigned
- Socket socket = await();
- if (socket == null)
- continue;
-
- // Process the request from this socket
- if (!handler.process(socket)) {
- // Close socket
- try {
- socket.close();
- } catch (IOException e) {
- }
- }
-
- // Finish up this request
- socket = null;
- recycleWorkerThread(this);
-
- }
-
- }
-
-
- /**
- * Start the background processing thread.
- */
- public void start() {
- thread = new Thread(this);
- thread.setName(getName() + "-" + (++curThreads));
- thread.setDaemon(true);
- thread.start();
- }
-
-
- }
-
-
// -------------------- Public methods --------------------
public void init()
@@ -515,10 +388,8 @@
running = true;
paused = false;
- // Create worker collection
- if (executor == null) {
- workers = new WorkerStack(maxThreads);
- }
+ if (executor == null)
+ executor = new SimpleThreadPoolExecutor(maxThreads);
// Start acceptor threads
for (int i = 0; i < acceptorThreadCount; i++) {
@@ -638,93 +509,12 @@
return true;
}
-
/**
- * Create (or allocate) and return an available processor for use in
- * processing a specific HTTP request, if possible. If the maximum
- * allowed processors have already been created and are in use, return
- * <code>null</code> instead.
- */
- protected Worker createWorkerThread() {
-
- synchronized (workers) {
- if (workers.size() > 0) {
- curThreadsBusy++;
- return workers.pop();
- }
- if ((maxThreads > 0) && (curThreads < maxThreads)) {
- curThreadsBusy++;
- return (newWorkerThread());
- } else {
- if (maxThreads < 0) {
- curThreadsBusy++;
- return (newWorkerThread());
- } else {
- return (null);
- }
- }
- }
-
- }
-
-
- /**
- * Create and return a new processor suitable for processing HTTP
- * requests and returning the corresponding responses.
- */
- protected Worker newWorkerThread() {
-
- Worker workerThread = new Worker();
- workerThread.start();
- return (workerThread);
-
- }
-
-
- /**
- * Return a new worker thread, and block while to worker is available.
- */
- protected Worker getWorkerThread() {
- // Allocate a new worker thread
- Worker workerThread = createWorkerThread();
- while (workerThread == null) {
- try {
- synchronized (workers) {
- workers.wait();
- }
- } catch (InterruptedException e) {
- // Ignore
- }
- workerThread = createWorkerThread();
- }
- return workerThread;
- }
-
-
- /**
- * Recycle the specified Processor so that it can be used again.
- *
- * @param workerThread The processor to be recycled
- */
- protected void recycleWorkerThread(Worker workerThread) {
- synchronized (workers) {
- workers.push(workerThread);
- curThreadsBusy--;
- workers.notify();
- }
- }
-
-
- /**
* Process given socket.
*/
protected boolean processSocket(Socket socket) {
try {
- if (executor == null) {
- getWorkerThread().assign(socket);
- } else {
- executor.execute(new SocketProcessor(socket));
- }
+ executor.execute(new SocketProcessor(socket));
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
@@ -734,60 +524,4 @@
return true;
}
-
- // ------------------------------------------------- WorkerStack Inner
Class
-
-
- public class WorkerStack {
-
- protected Worker[] workers = null;
- protected int end = 0;
-
- public WorkerStack(int size) {
- workers = new Worker[size];
- }
-
- /**
- * Put the object into the queue.
- *
- * @param object the object to be appended to the queue (first
element).
- */
- public void push(Worker worker) {
- workers[end++] = worker;
- }
-
- /**
- * Get the first object out of the queue. Return null if the queue
- * is empty.
- */
- public Worker pop() {
- if (end > 0) {
- return workers[--end];
- }
- return null;
- }
-
- /**
- * Get the first object out of the queue, Return null if the queue
- * is empty.
- */
- public Worker peek() {
- return workers[end];
- }
-
- /**
- * Is the queue empty?
- */
- public boolean isEmpty() {
- return (end == 0);
- }
-
- /**
- * How many elements are there in this queue?
- */
- public int size() {
- return (end);
- }
- }
-
}
Index: java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
===================================================================
--- java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
(revision 0)
+++ java/org/apache/tomcat/util/net/SimpleThreadPoolExecutor.java
(revision 0)
@@ -0,0 +1,317 @@
+/*
+ * Copyright 1999-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.Executor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tomcat.util.res.StringManager;
+
+public class SimpleThreadPoolExecutor implements Executor {
+
+ private boolean running = true;
+
+ /**
+ * Available workers.
+ */
+ protected WorkerStack workers = null;
+
+ /**
+ * Current worker threads busy count.
+ */
+ protected int curThreadsBusy = 0;
+
+
+ /**
+ * Current worker threads count.
+ */
+ protected int curThreads = 0;
+
+
+ /**
+ * Sequence number used to generate thread names.
+ */
+ protected int sequence = 0;
+
+
+ /**
+ * Maximum amount of worker threads.
+ */
+ protected int maxThreads = 40;
+ public void setMaxThreads(int maxThreads) { /* XXX */ }
+ public int getMaxThreads() { return maxThreads; }
+
+ /**
+ * Name of the thread pool, which will be used for naming child threads.
+ */
+ protected String name = "TP";
+ public void setName(String name) { this.name = name; }
+ public String getName() { return name; }
+
+ public int getCurrentThreadCount() {
+ return curThreads;
+ }
+
+ public int getCurrentThreadsBusy() {
+ return curThreads - workers.size();
+ }
+
+ SimpleThreadPoolExecutor(int maxThreads){
+ this.maxThreads = maxThreads;
+ workers = new WorkerStack(maxThreads);
+ }
+
+ public void execute(Runnable job){
+ getWorkerThread().assign(job);
+ }
+
+ // ----------------------------------------------------- Worker Inner Class
+
+
+ protected class Worker implements Runnable {
+
+ protected Thread thread = null;
+ protected boolean available = false;
+ protected Runnable job = null;
+
+
+ /**
+ * Process an incoming TCP/IP connection on the specified socket. Any
+ * exception that occurs during processing must be logged and
swallowed.
+ * <b>NOTE</b>: This method is called from our Connector's thread. We
+ * must assign it to our own thread so that multiple simultaneous
+ * requests can be handled.
+ *
+ * @param socket TCP socket to process
+ */
+ synchronized void assign(Runnable job) {
+
+ // Wait for the Processor to get the previous Socket
+ while (available) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // Store the newly available Socket and notify our thread
+ this.job = job;
+ available = true;
+ notifyAll();
+
+ }
+
+
+ /**
+ * Await a newly assigned Socket from our Connector, or
<code>null</code>
+ * if we are supposed to shut down.
+ */
+ private synchronized Runnable await() {
+
+ // Wait for the Connector to provide a new Socket
+ while (!available) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // Notify the Connector that we have received this Socket
+ Runnable job = this.job;
+ available = false;
+ notifyAll();
+
+ return (job);
+
+ }
+
+ public void shutdown(){
+ running = false;
+ }
+
+ /**
+ * The background thread that listens for incoming TCP/IP connections
and
+ * hands them off to an appropriate processor.
+ */
+ public void run() {
+
+ // Process requests until we receive a shutdown signal
+ while (running) {
+
+ // Wait for the next socket to be assigned
+ Runnable job = await();
+ if (job == null)
+ continue;
+
+ job.run();
+
+ job = null;
+
+ recycleWorkerThread(this);
+ }
+
+ }
+
+
+ /**
+ * Start the background processing thread.
+ */
+ public void start() {
+ thread = new Thread(this);
+ thread.setName(getName() + "-" + (++curThreads));
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+
+ }
+
+
+ /**
+ * Create (or allocate) and return an available processor for use in
+ * processing a specific HTTP request, if possible. If the maximum
+ * allowed processors have already been created and are in use, return
+ * <code>null</code> instead.
+ */
+ protected Worker createWorkerThread() {
+
+ synchronized (workers) {
+ if (workers.size() > 0) {
+ curThreadsBusy++;
+ return workers.pop();
+ }
+ if ((maxThreads > 0) && (curThreads < maxThreads)) {
+ curThreadsBusy++;
+ return (newWorkerThread());
+ } else {
+ if (maxThreads < 0) {
+ curThreadsBusy++;
+ return (newWorkerThread());
+ } else {
+ return (null);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Create and return a new processor suitable for processing HTTP
+ * requests and returning the corresponding responses.
+ */
+ protected Worker newWorkerThread() {
+
+ Worker workerThread = new Worker();
+ workerThread.start();
+ return (workerThread);
+
+ }
+
+ /**
+ * Return a new worker thread, and block while to worker is available.
+ */
+ protected Worker getWorkerThread() {
+ // Allocate a new worker thread
+ Worker workerThread = createWorkerThread();
+ while (workerThread == null) {
+ try {
+ synchronized (workers) {
+ workers.wait();
+ }
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ workerThread = createWorkerThread();
+ }
+ return workerThread;
+ }
+
+
+ /**
+ * Recycle the specified Processor so that it can be used again.
+ *
+ * @param workerThread The processor to be recycled
+ */
+ protected void recycleWorkerThread(Worker workerThread) {
+ synchronized (workers) {
+ workers.push(workerThread);
+ curThreadsBusy--;
+ workers.notify();
+ }
+ }
+
+ // ------------------------------------------------- WorkerStack Inner
Class
+
+ public class WorkerStack {
+
+ protected Worker[] workers = null;
+ protected int end = 0;
+
+ public WorkerStack(int size) {
+ workers = new Worker[size];
+ }
+
+ /**
+ * Put the object into the queue.
+ *
+ * @param object the object to be appended to the queue (first
element).
+ */
+ public void push(Worker worker) {
+ workers[end++] = worker;
+ }
+
+ /**
+ * Get the first object out of the queue. Return null if the queue
+ * is empty.
+ */
+ public Worker pop() {
+ if (end > 0) {
+ return workers[--end];
+ }
+ return null;
+ }
+
+ /**
+ * Get the first object out of the queue, Return null if the queue
+ * is empty.
+ */
+ public Worker peek() {
+ return workers[end];
+ }
+
+ /**
+ * Is the queue empty?
+ */
+ public boolean isEmpty() {
+ return (end == 0);
+ }
+
+ /**
+ * How many elements are there in this queue?
+ */
+ public int size() {
+ return (end);
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]