Author: remm Date: Fri Apr 21 07:39:26 2006 New Revision: 395901 URL: http://svn.apache.org/viewcvs?rev=395901&view=rev Log: - Add the refactored java.io endpoint that I talked about earlier.
Added: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java (with props) Removed: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/compat/ Added: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java URL: http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java?rev=395901&view=auto ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java (added) +++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java Fri Apr 21 07:39:26 2006 @@ -0,0 +1,694 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tomcat.util.net; + +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.threads.ThreadWithAttributes; + +/** + * Handle incoming TCP connections. + * + * This class implement a simple server model: one listener thread accepts on a socket and + * creates a new worker thread for each incoming connection. + * + * More advanced Endpoints will reuse the threads, use queues, etc. + * + * @author James Duncan Davidson + * @author Jason Hunter + * @author James Todd + * @author Costin Manolache + * @author Gal Shachor + * @author Yoav Shapira + * @author Remy Maucherat + */ +public class JIoEndpoint { + + + // -------------------------------------------------------------- Constants + + + protected static Log log=LogFactory.getLog(JIoEndpoint.class ); + + protected StringManager sm = + StringManager.getManager("org.apache.tomcat.util.net.res"); + + + // ----------------------------------------------------------------- Fields + + + /** + * The acceptor thread. + */ + protected Thread acceptorThread = null; + + + /** + * Available workers. + */ + protected WorkerStack workers = null; + + + /** + * Running state of the endpoint. + */ + protected volatile boolean running = false; + + + /** + * Will be set to true whenever the endpoint is paused. + */ + protected volatile boolean paused = false; + + + /** + * Track the initialization state of the endpoint. + */ + protected boolean initialized = false; + + + /** + * Current worker threads busy count. + */ + protected int curThreadsBusy = 0; + + + /** + * Current worker threads count. + */ + protected int curThreads = 0; + + + /** + * Sequence number used to generate thread names. + */ + protected int sequence = 0; + + + /** + * Associated server socket. + */ + protected ServerSocket serverSocket = null; + + + // ------------------------------------------------------------- Properties + + + /** + * Maximum amount of worker threads. + */ + protected int maxThreads = 40; + public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } + public int getMaxThreads() { return maxThreads; } + + + /** + * Priority of the acceptor and poller threads. + */ + protected int threadPriority = Thread.NORM_PRIORITY; + public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } + public int getThreadPriority() { return threadPriority; } + + + /** + * Server socket port. + */ + protected int port; + public int getPort() { return port; } + public void setPort(int port ) { this.port=port; } + + + /** + * Address for the server socket. + */ + protected InetAddress address; + public InetAddress getAddress() { return address; } + public void setAddress(InetAddress address) { this.address = address; } + + + /** + * Handling of accepted sockets. + */ + protected Handler handler = null; + public void setHandler(Handler handler ) { this.handler = handler; } + public Handler getHandler() { return handler; } + + + /** + * Allows the server developer to specify the backlog that + * should be used for server sockets. By default, this value + * is 100. + */ + protected int backlog = 100; + public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } + public int getBacklog() { return backlog; } + + + /** + * Socket TCP no delay. + */ + protected boolean tcpNoDelay = false; + public boolean getTcpNoDelay() { return tcpNoDelay; } + public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } + + + /** + * Socket linger. + */ + protected int soLinger = 100; + public int getSoLinger() { return soLinger; } + public void setSoLinger(int soLinger) { this.soLinger = soLinger; } + + + /** + * Socket timeout. + */ + protected int soTimeout = -1; + public int getSoTimeout() { return soTimeout; } + public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; } + + + /** + * The default is true - the created threads will be + * in daemon mode. If set to false, the control thread + * will not be daemon - and will keep the process alive. + */ + protected boolean daemon = true; + public void setDaemon(boolean b) { daemon = b; } + public boolean getDaemon() { return daemon; } + + + /** + * Name of the thread pool, which will be used for naming child threads. + */ + protected String name = "TP"; + public void setName(String name) { this.name = name; } + public String getName() { return name; } + + + /** + * Server socket factory. + */ + protected ServerSocketFactory serverSocketFactory = null; + public void setServerSocketFactory(ServerSocketFactory factory) { this.serverSocketFactory = factory; } + public ServerSocketFactory getServerSocketFactory() { return serverSocketFactory; } + + + public boolean isRunning() { + return running; + } + + public boolean isPaused() { + return paused; + } + + public int getCurrentThreadCount() { + return curThreads; + } + + public int getCurrentThreadsBusy() { + return curThreads - workers.size(); + } + + + // ------------------------------------------------ Handler Inner Interface + + + /** + * Bare bones interface used for socket processing. Per thread data is to be + * stored in the ThreadWithAttributes extra folders, or alternately in + * thread local fields. + */ + public interface Handler { + public boolean process(Socket socket); + } + + + // --------------------------------------------------- Acceptor Inner Class + + + /** + * Server socket acceptor thread. + */ + protected class Acceptor implements Runnable { + + + /** + * The background thread that listens for incoming TCP/IP connections and + * hands them off to an appropriate processor. + */ + public void run() { + + // Loop until we receive a shutdown command + while (running) { + + // Loop if endpoint is paused + while (paused) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore + } + } + + // Allocate a new worker thread + Worker workerThread = getWorkerThread(); + + // Accept the next incoming connection from the server socket + try { + Socket socket = serverSocketFactory.acceptSocket(serverSocket); + serverSocketFactory.initSocket(socket); + // Hand this socket off to an appropriate processor + if (setSocketOptions(socket)) { + workerThread.assign(socket); + } else { + // Close socket right away + try { + socket.close(); + } catch (IOException e) { + } + } + } catch (Throwable t) { + log.error(sm.getString("endpoint.accept.fail"), t); + } + + // The processor will recycle itself when it finishes + + } + + } + + } + + + // ----------------------------------------------------- Worker Inner Class + + + protected class Worker implements Runnable { + + protected Thread thread = null; + protected boolean available = false; + protected Socket socket = null; + + + /** + * Process an incoming TCP/IP connection on the specified socket. Any + * exception that occurs during processing must be logged and swallowed. + * <b>NOTE</b>: This method is called from our Connector's thread. We + * must assign it to our own thread so that multiple simultaneous + * requests can be handled. + * + * @param socket TCP socket to process + */ + synchronized void assign(Socket socket) { + + // Wait for the Processor to get the previous Socket + while (available) { + try { + wait(); + } catch (InterruptedException e) { + } + } + + // Store the newly available Socket and notify our thread + this.socket = socket; + available = true; + notifyAll(); + + } + + + /** + * Await a newly assigned Socket from our Connector, or <code>null</code> + * if we are supposed to shut down. + */ + private synchronized Socket await() { + + // Wait for the Connector to provide a new Socket + while (!available) { + try { + wait(); + } catch (InterruptedException e) { + } + } + + // Notify the Connector that we have received this Socket + Socket socket = this.socket; + available = false; + notifyAll(); + + return (socket); + + } + + + + /** + * The background thread that listens for incoming TCP/IP connections and + * hands them off to an appropriate processor. + */ + public void run() { + + // Process requests until we receive a shutdown signal + while (running) { + + // Wait for the next socket to be assigned + Socket socket = await(); + if (socket == null) + continue; + + // Process the request from this socket + if (!handler.process(socket)) { + // Close socket + try { + socket.close(); + } catch (IOException e) { + } + } + + // Finish up this request + socket = null; + recycleWorkerThread(this); + + } + + } + + + /** + * Start the background processing thread. + */ + public void start() { + thread = new ThreadWithAttributes(JIoEndpoint.this, this); + thread.setName(getName() + "-" + (++curThreads)); + thread.setDaemon(true); + thread.start(); + } + + + } + + + // -------------------- Public methods -------------------- + + public void init() throws IOException, InstantiationException { + if (serverSocketFactory == null) { + serverSocketFactory = ServerSocketFactory.getDefault(); + } + if (serverSocket == null) { + try { + if (address == null) { + serverSocket = serverSocketFactory.createSocket(port, backlog); + } else { + serverSocket = serverSocketFactory.createSocket(port, backlog, address); + } + } catch (BindException be) { + throw new BindException(be.getMessage() + ":" + port); + } + } + //if( serverTimeout >= 0 ) + // serverSocket.setSoTimeout( serverTimeout ); + initialized = true; + } + + public void start() + throws Exception { + // Initialize socket if not done before + if (!initialized) { + init(); + } + if (!running) { + running = true; + paused = false; + + // Start acceptor thread + acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor"); + acceptorThread.setPriority(threadPriority); + acceptorThread.setDaemon(daemon); + acceptorThread.start(); + } + } + + public void pause() { + if (running && !paused) { + paused = true; + unlockAccept(); + } + } + + public void resume() { + if (running) { + paused = false; + } + } + + public void stop() { + if (running) { + running = false; + unlockAccept(); + acceptorThread = null; + } + } + + /** + * Deallocate APR memory pools, and close server socket. + */ + public void destroy() throws Exception { + if (running) { + stop(); + } + if (serverSocket != null) { + try { + if (serverSocket!=null) + serverSocket.close(); + } catch (Exception e) { + log.error(sm.getString("endpoint.err.close"), e); + } + serverSocket = null; + } + initialized = false ; + } + + + /** + * Unlock the accept by using a local connection. + */ + protected void unlockAccept() { + Socket s = null; + try { + // Need to create a connection to unlock the accept(); + if (address == null) { + s = new Socket("127.0.0.1", port); + } else { + s = new Socket(address, port); + // setting soLinger to a small value will help shutdown the + // connection quicker + s.setSoLinger(true, 0); + } + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); + } + } finally { + if (s != null) { + try { + s.close(); + } catch (Exception e) { + // Ignore + } + } + } + } + + + /** + * Set the options for the current socket. + */ + protected boolean setSocketOptions(Socket socket) { + // Process the connection + int step = 1; + try { + + // 1: Set socket options: timeout, linger, etc + if (soLinger >= 0) { + socket.setSoLinger(true, soLinger); + } + if (tcpNoDelay) { + socket.setTcpNoDelay(tcpNoDelay); + } + if (soTimeout > 0) { + socket.setSoTimeout(soTimeout); + } + + // 2: SSL handshake + step = 2; + serverSocketFactory.handshake(socket); + + } catch (Throwable t) { + if (log.isDebugEnabled()) { + if (step == 2) { + log.debug(sm.getString("endpoint.err.handshake"), t); + } else { + log.debug(sm.getString("endpoint.err.unexpected"), t); + } + } + // Tell to close the socket + return false; + } + return true; + } + + + /** + * Create (or allocate) and return an available processor for use in + * processing a specific HTTP request, if possible. If the maximum + * allowed processors have already been created and are in use, return + * <code>null</code> instead. + */ + protected Worker createWorkerThread() { + + synchronized (workers) { + if (workers.size() > 0) { + curThreadsBusy++; + return ((Worker) workers.pop()); + } + if ((maxThreads > 0) && (curThreads < maxThreads)) { + curThreadsBusy++; + return (newWorkerThread()); + } else { + if (maxThreads < 0) { + curThreadsBusy++; + return (newWorkerThread()); + } else { + return (null); + } + } + } + + } + + + /** + * Create and return a new processor suitable for processing HTTP + * requests and returning the corresponding responses. + */ + protected Worker newWorkerThread() { + + Worker workerThread = new Worker(); + workerThread.start(); + return (workerThread); + + } + + + /** + * Return a new worker thread, and block while to worker is available. + */ + protected Worker getWorkerThread() { + // Allocate a new worker thread + Worker workerThread = createWorkerThread(); + while (workerThread == null) { + try { + synchronized (workers) { + workers.wait(); + } + } catch (InterruptedException e) { + // Ignore + } + workerThread = createWorkerThread(); + } + return workerThread; + } + + + /** + * Recycle the specified Processor so that it can be used again. + * + * @param workerThread The processor to be recycled + */ + protected void recycleWorkerThread(Worker workerThread) { + synchronized (workers) { + workers.push(workerThread); + curThreadsBusy--; + workers.notify(); + } + } + + + // ------------------------------------------------- WorkerStack Inner Class + + + public class WorkerStack { + + protected Worker[] workers = null; + protected int end = 0; + + public WorkerStack(int size) { + workers = new Worker[size]; + } + + /** + * Put the object into the queue. + * + * @param object the object to be appended to the queue (first element). + */ + public void push(Worker worker) { + workers[end++] = worker; + } + + /** + * Get the first object out of the queue. Return null if the queue + * is empty. + */ + public Worker pop() { + if (end > 0) { + return workers[--end]; + } + return null; + } + + /** + * Get the first object out of the queue, Return null if the queue + * is empty. + */ + public Worker peek() { + return workers[end]; + } + + /** + * Is the queue empty? + */ + public boolean isEmpty() { + return (end == 0); + } + + /** + * How many elements are there in this queue? + */ + public int size() { + return (end); + } + } + +} Propchange: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]