Author: remm
Date: Fri Apr 21 07:39:26 2006
New Revision: 395901

URL: http://svn.apache.org/viewcvs?rev=395901&view=rev
Log:
- Add the refactored java.io endpoint that I talked about earlier.

Added:
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java   
(with props)
Removed:
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/compat/

Added: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
URL: 
http://svn.apache.org/viewcvs/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java?rev=395901&view=auto
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java 
(added)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java Fri 
Apr 21 07:39:26 2006
@@ -0,0 +1,694 @@
+/*
+ *  Copyright 1999-2004 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.ThreadWithAttributes;
+
+/**
+ * Handle incoming TCP connections.
+ *
+ * This class implement a simple server model: one listener thread accepts on 
a socket and
+ * creates a new worker thread for each incoming connection.
+ *
+ * More advanced Endpoints will reuse the threads, use queues, etc.
+ *
+ * @author James Duncan Davidson
+ * @author Jason Hunter
+ * @author James Todd
+ * @author Costin Manolache
+ * @author Gal Shachor
+ * @author Yoav Shapira
+ * @author Remy Maucherat
+ */
+public class JIoEndpoint {
+
+
+    // -------------------------------------------------------------- Constants
+
+
+    protected static Log log=LogFactory.getLog(JIoEndpoint.class );
+
+    protected StringManager sm = 
+        StringManager.getManager("org.apache.tomcat.util.net.res");
+
+
+    // ----------------------------------------------------------------- Fields
+
+
+    /**
+     * The acceptor thread.
+     */
+    protected Thread acceptorThread = null;
+
+
+    /**
+     * Available workers.
+     */
+    protected WorkerStack workers = null;
+
+
+    /**
+     * Running state of the endpoint.
+     */
+    protected volatile boolean running = false;
+
+
+    /**
+     * Will be set to true whenever the endpoint is paused.
+     */
+    protected volatile boolean paused = false;
+
+
+    /**
+     * Track the initialization state of the endpoint.
+     */
+    protected boolean initialized = false;
+
+
+    /**
+     * Current worker threads busy count.
+     */
+    protected int curThreadsBusy = 0;
+
+
+    /**
+     * Current worker threads count.
+     */
+    protected int curThreads = 0;
+
+
+    /**
+     * Sequence number used to generate thread names.
+     */
+    protected int sequence = 0;
+
+
+    /**
+     * Associated server socket.
+     */
+    protected ServerSocket serverSocket = null;
+
+
+    // ------------------------------------------------------------- Properties
+
+
+    /**
+     * Maximum amount of worker threads.
+     */
+    protected int maxThreads = 40;
+    public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; }
+    public int getMaxThreads() { return maxThreads; }
+
+
+    /**
+     * Priority of the acceptor and poller threads.
+     */
+    protected int threadPriority = Thread.NORM_PRIORITY;
+    public void setThreadPriority(int threadPriority) { this.threadPriority = 
threadPriority; }
+    public int getThreadPriority() { return threadPriority; }
+
+    
+    /**
+     * Server socket port.
+     */
+    protected int port;
+    public int getPort() { return port; }
+    public void setPort(int port ) { this.port=port; }
+
+
+    /**
+     * Address for the server socket.
+     */
+    protected InetAddress address;
+    public InetAddress getAddress() { return address; }
+    public void setAddress(InetAddress address) { this.address = address; }
+
+
+    /**
+     * Handling of accepted sockets.
+     */
+    protected Handler handler = null;
+    public void setHandler(Handler handler ) { this.handler = handler; }
+    public Handler getHandler() { return handler; }
+
+
+    /**
+     * Allows the server developer to specify the backlog that
+     * should be used for server sockets. By default, this value
+     * is 100.
+     */
+    protected int backlog = 100;
+    public void setBacklog(int backlog) { if (backlog > 0) this.backlog = 
backlog; }
+    public int getBacklog() { return backlog; }
+
+
+    /**
+     * Socket TCP no delay.
+     */
+    protected boolean tcpNoDelay = false;
+    public boolean getTcpNoDelay() { return tcpNoDelay; }
+    public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = 
tcpNoDelay; }
+
+
+    /**
+     * Socket linger.
+     */
+    protected int soLinger = 100;
+    public int getSoLinger() { return soLinger; }
+    public void setSoLinger(int soLinger) { this.soLinger = soLinger; }
+
+
+    /**
+     * Socket timeout.
+     */
+    protected int soTimeout = -1;
+    public int getSoTimeout() { return soTimeout; }
+    public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; }
+
+
+    /**
+     * The default is true - the created threads will be
+     *  in daemon mode. If set to false, the control thread
+     *  will not be daemon - and will keep the process alive.
+     */
+    protected boolean daemon = true;
+    public void setDaemon(boolean b) { daemon = b; }
+    public boolean getDaemon() { return daemon; }
+
+
+    /**
+     * Name of the thread pool, which will be used for naming child threads.
+     */
+    protected String name = "TP";
+    public void setName(String name) { this.name = name; }
+    public String getName() { return name; }
+
+
+    /**
+     * Server socket factory.
+     */
+    protected ServerSocketFactory serverSocketFactory = null;
+    public void setServerSocketFactory(ServerSocketFactory factory) { 
this.serverSocketFactory = factory; }
+    public ServerSocketFactory getServerSocketFactory() { return 
serverSocketFactory; }
+
+
+    public boolean isRunning() {
+        return running;
+    }
+    
+    public boolean isPaused() {
+        return paused;
+    }
+    
+    public int getCurrentThreadCount() {
+        return curThreads;
+    }
+    
+    public int getCurrentThreadsBusy() {
+        return curThreads - workers.size();
+    }
+    
+
+    // ------------------------------------------------ Handler Inner Interface
+
+
+    /**
+     * Bare bones interface used for socket processing. Per thread data is to 
be
+     * stored in the ThreadWithAttributes extra folders, or alternately in
+     * thread local fields.
+     */
+    public interface Handler {
+        public boolean process(Socket socket);
+    }
+
+
+    // --------------------------------------------------- Acceptor Inner Class
+
+
+    /**
+     * Server socket acceptor thread.
+     */
+    protected class Acceptor implements Runnable {
+
+
+        /**
+         * The background thread that listens for incoming TCP/IP connections 
and
+         * hands them off to an appropriate processor.
+         */
+        public void run() {
+
+            // Loop until we receive a shutdown command
+            while (running) {
+
+                // Loop if endpoint is paused
+                while (paused) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        // Ignore
+                    }
+                }
+
+                // Allocate a new worker thread
+                Worker workerThread = getWorkerThread();
+
+                // Accept the next incoming connection from the server socket
+                try {
+                    Socket socket = 
serverSocketFactory.acceptSocket(serverSocket);
+                    serverSocketFactory.initSocket(socket);
+                    // Hand this socket off to an appropriate processor
+                    if (setSocketOptions(socket)) {
+                        workerThread.assign(socket);
+                    } else {
+                        // Close socket right away
+                        try {
+                            socket.close();
+                        } catch (IOException e) {
+                        }
+                    }
+                } catch (Throwable t) {
+                    log.error(sm.getString("endpoint.accept.fail"), t);
+                }
+
+                // The processor will recycle itself when it finishes
+
+            }
+
+        }
+
+    }
+
+
+    // ----------------------------------------------------- Worker Inner Class
+
+
+    protected class Worker implements Runnable {
+
+        protected Thread thread = null;
+        protected boolean available = false;
+        protected Socket socket = null;
+
+        
+        /**
+         * Process an incoming TCP/IP connection on the specified socket.  Any
+         * exception that occurs during processing must be logged and 
swallowed.
+         * <b>NOTE</b>:  This method is called from our Connector's thread.  We
+         * must assign it to our own thread so that multiple simultaneous
+         * requests can be handled.
+         *
+         * @param socket TCP socket to process
+         */
+        synchronized void assign(Socket socket) {
+
+            // Wait for the Processor to get the previous Socket
+            while (available) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                }
+            }
+
+            // Store the newly available Socket and notify our thread
+            this.socket = socket;
+            available = true;
+            notifyAll();
+
+        }
+
+        
+        /**
+         * Await a newly assigned Socket from our Connector, or 
<code>null</code>
+         * if we are supposed to shut down.
+         */
+        private synchronized Socket await() {
+
+            // Wait for the Connector to provide a new Socket
+            while (!available) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                }
+            }
+
+            // Notify the Connector that we have received this Socket
+            Socket socket = this.socket;
+            available = false;
+            notifyAll();
+
+            return (socket);
+
+        }
+
+
+
+        /**
+         * The background thread that listens for incoming TCP/IP connections 
and
+         * hands them off to an appropriate processor.
+         */
+        public void run() {
+
+            // Process requests until we receive a shutdown signal
+            while (running) {
+
+                // Wait for the next socket to be assigned
+                Socket socket = await();
+                if (socket == null)
+                    continue;
+
+                // Process the request from this socket
+                if (!handler.process(socket)) {
+                    // Close socket
+                    try {
+                        socket.close();
+                    } catch (IOException e) {
+                    }
+                }
+
+                // Finish up this request
+                socket = null;
+                recycleWorkerThread(this);
+
+            }
+
+        }
+
+
+        /**
+         * Start the background processing thread.
+         */
+        public void start() {
+            thread = new ThreadWithAttributes(JIoEndpoint.this, this);
+            thread.setName(getName() + "-" + (++curThreads));
+            thread.setDaemon(true);
+            thread.start();
+        }
+
+
+    }
+
+
+    // -------------------- Public methods --------------------
+
+    public void init() throws IOException, InstantiationException {
+        if (serverSocketFactory == null) {
+            serverSocketFactory = ServerSocketFactory.getDefault();
+        }
+        if (serverSocket == null) {
+            try {
+                if (address == null) {
+                    serverSocket = serverSocketFactory.createSocket(port, 
backlog);
+                } else {
+                    serverSocket = serverSocketFactory.createSocket(port, 
backlog, address);
+                }
+            } catch (BindException be) {
+                throw new BindException(be.getMessage() + ":" + port);
+            }
+        }
+        //if( serverTimeout >= 0 )
+        //    serverSocket.setSoTimeout( serverTimeout );
+        initialized = true;
+    }
+    
+    public void start()
+        throws Exception {
+        // Initialize socket if not done before
+        if (!initialized) {
+            init();
+        }
+        if (!running) {
+            running = true;
+            paused = false;
+
+            // Start acceptor thread
+            acceptorThread = new Thread(new Acceptor(), getName() + 
"-Acceptor");
+            acceptorThread.setPriority(threadPriority);
+            acceptorThread.setDaemon(daemon);
+            acceptorThread.start();
+        }
+    }
+
+    public void pause() {
+        if (running && !paused) {
+            paused = true;
+            unlockAccept();
+        }
+    }
+
+    public void resume() {
+        if (running) {
+            paused = false;
+        }
+    }
+
+    public void stop() {
+        if (running) {
+            running = false;
+            unlockAccept();
+            acceptorThread = null;
+        }
+    }
+
+    /**
+     * Deallocate APR memory pools, and close server socket.
+     */
+    public void destroy() throws Exception {
+        if (running) {
+            stop();
+        }
+        if (serverSocket != null) {
+            try {
+                if (serverSocket!=null)
+                    serverSocket.close();
+            } catch (Exception e) {
+                log.error(sm.getString("endpoint.err.close"), e);
+            }
+            serverSocket = null;
+        }
+        initialized = false ;
+    }
+
+    
+    /**
+     * Unlock the accept by using a local connection.
+     */
+    protected void unlockAccept() {
+        Socket s = null;
+        try {
+            // Need to create a connection to unlock the accept();
+            if (address == null) {
+                s = new Socket("127.0.0.1", port);
+            } else {
+                s = new Socket(address, port);
+                    // setting soLinger to a small value will help shutdown the
+                    // connection quicker
+                s.setSoLinger(true, 0);
+            }
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
+            }
+        } finally {
+            if (s != null) {
+                try {
+                    s.close();
+                } catch (Exception e) {
+                    // Ignore
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Set the options for the current socket.
+     */
+    protected boolean setSocketOptions(Socket socket) {
+        // Process the connection
+        int step = 1;
+        try {
+
+            // 1: Set socket options: timeout, linger, etc
+            if (soLinger >= 0) { 
+                socket.setSoLinger(true, soLinger);
+            }
+            if (tcpNoDelay) {
+                socket.setTcpNoDelay(tcpNoDelay);
+            }
+            if (soTimeout > 0) {
+                socket.setSoTimeout(soTimeout);
+            }
+
+            // 2: SSL handshake
+            step = 2;
+            serverSocketFactory.handshake(socket);
+
+        } catch (Throwable t) {
+            if (log.isDebugEnabled()) {
+                if (step == 2) {
+                    log.debug(sm.getString("endpoint.err.handshake"), t);
+                } else {
+                    log.debug(sm.getString("endpoint.err.unexpected"), t);
+                }
+            }
+            // Tell to close the socket
+            return false;
+        }
+        return true;
+    }
+
+    
+    /**
+     * Create (or allocate) and return an available processor for use in
+     * processing a specific HTTP request, if possible.  If the maximum
+     * allowed processors have already been created and are in use, return
+     * <code>null</code> instead.
+     */
+    protected Worker createWorkerThread() {
+
+        synchronized (workers) {
+            if (workers.size() > 0) {
+                curThreadsBusy++;
+                return ((Worker) workers.pop());
+            }
+            if ((maxThreads > 0) && (curThreads < maxThreads)) {
+                curThreadsBusy++;
+                return (newWorkerThread());
+            } else {
+                if (maxThreads < 0) {
+                    curThreadsBusy++;
+                    return (newWorkerThread());
+                } else {
+                    return (null);
+                }
+            }
+        }
+
+    }
+
+
+    /**
+     * Create and return a new processor suitable for processing HTTP
+     * requests and returning the corresponding responses.
+     */
+    protected Worker newWorkerThread() {
+
+        Worker workerThread = new Worker();
+        workerThread.start();
+        return (workerThread);
+
+    }
+
+
+    /**
+     * Return a new worker thread, and block while to worker is available.
+     */
+    protected Worker getWorkerThread() {
+        // Allocate a new worker thread
+        Worker workerThread = createWorkerThread();
+        while (workerThread == null) {
+            try {
+                synchronized (workers) {
+                    workers.wait();
+                }
+            } catch (InterruptedException e) {
+                // Ignore
+            }
+            workerThread = createWorkerThread();
+        }
+        return workerThread;
+    }
+
+
+    /**
+     * Recycle the specified Processor so that it can be used again.
+     *
+     * @param workerThread The processor to be recycled
+     */
+    protected void recycleWorkerThread(Worker workerThread) {
+        synchronized (workers) {
+            workers.push(workerThread);
+            curThreadsBusy--;
+            workers.notify();
+        }
+    }
+
+
+    // ------------------------------------------------- WorkerStack Inner 
Class
+
+
+    public class WorkerStack {
+        
+        protected Worker[] workers = null;
+        protected int end = 0;
+        
+        public WorkerStack(int size) {
+            workers = new Worker[size];
+        }
+        
+        /** 
+         * Put the object into the queue.
+         * 
+         * @param   object      the object to be appended to the queue (first 
element). 
+         */
+        public void push(Worker worker) {
+            workers[end++] = worker;
+        }
+        
+        /**
+         * Get the first object out of the queue. Return null if the queue
+         * is empty. 
+         */
+        public Worker pop() {
+            if (end > 0) {
+                return workers[--end];
+            }
+            return null;
+        }
+        
+        /**
+         * Get the first object out of the queue, Return null if the queue
+         * is empty.
+         */
+        public Worker peek() {
+            return workers[end];
+        }
+        
+        /**
+         * Is the queue empty?
+         */
+        public boolean isEmpty() {
+            return (end == 0);
+        }
+        
+        /**
+         * How many elements are there in this queue?
+         */
+        public int size() {
+            return (end);
+        }
+    }
+
+}

Propchange: 
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to