Author: costin Date: Wed Nov 23 21:20:50 2005 New Revision: 348655 URL: http://svn.apache.org/viewcvs?rev=348655&view=rev Log: Refactoring - leave only the core methods, no ThreadPool or specific code.
Modified: tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java Modified: tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java URL: http://svn.apache.org/viewcvs/tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java?rev=348655&r1=348654&r2=348655&view=diff ============================================================================== --- tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java (original) +++ tomcat/sandbox/java/org/apache/tomcat/util/net/PoolTcpEndpoint.java Wed Nov 23 21:20:50 2005 @@ -17,21 +17,17 @@ package org.apache.tomcat.util.net; import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.BindException; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; -import java.security.AccessControlException; -import java.util.Stack; -import java.util.Vector; +import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tomcat.util.res.StringManager; import org.apache.tomcat.util.threads.ThreadPool; -import org.apache.tomcat.util.threads.ThreadPoolRunnable; +import org.apache.tomcat.util.threads.ThreadPool.ThreadPoolListener; /* Similar with MPM module in Apache2.0. Handles all the details related with "tcp server" functionality - thread management, accept policy, etc. @@ -60,98 +56,123 @@ static Log log=LogFactory.getLog(PoolTcpEndpoint.class ); - private StringManager sm = + protected StringManager sm = StringManager.getManager("org.apache.tomcat.util.net.res"); - private static final int BACKLOG = 100; - private static final int TIMEOUT = 1000; + protected static final int BACKLOG = 100; + protected static final int TIMEOUT = 1000; - private final Object threadSync = new Object(); + protected int backlog = BACKLOG; + protected int serverTimeout = TIMEOUT; - private int backlog = BACKLOG; - private int serverTimeout = TIMEOUT; + protected InetAddress inet; + protected int port; - private InetAddress inet; - private int port; + protected ServerSocket serverSocket; - private ServerSocketFactory factory; - private ServerSocket serverSocket; - - private volatile boolean running = false; - private volatile boolean paused = false; - private boolean initialized = false; - private boolean reinitializing = false; - static final int debug=0; + protected volatile boolean running = false; + protected volatile boolean paused = false; + protected boolean initialized = false; + protected boolean reinitializing = false; protected boolean tcpNoDelay=false; protected int linger=100; protected int socketTimeout=-1; - private boolean lf = true; - // ------ Leader follower fields TcpConnectionHandler handler; - ThreadPoolRunnable listener; - ThreadPool tp; + // ------ Master slave fields + protected int curThreads = 0; + protected int maxThreads = 20; + protected int maxSpareThreads = 20; + protected int minSpareThreads = 20; + protected String type; + + protected String name = "EP"; // base name for threads - // ------ Master slave fields + protected int threadPriority; - /* The background thread. */ - private Thread thread = null; - /* Available processors. */ - private Stack workerThreads = new Stack(); - private int curThreads = 0; - private int maxThreads = 20; - /* All processors which have been created. */ - private Vector created = new Vector(); + protected boolean daemon = true; + private ArrayList listeners = new ArrayList(); public PoolTcpEndpoint() { - tp = new ThreadPool(); } - public PoolTcpEndpoint( ThreadPool tp ) { - this.tp=tp; + public static PoolTcpEndpoint getEndpoint(String type) { + String cn = null; + if( "apr".equals( type )) { + cn = "org.apache.tomcat.util.net.AprEndpoint"; + } + if( "lf".equals( type )) { + cn = "org.apache.tomcat.util.net.LeaderFollowerEndpoint"; + } + if( "acc".equals( type )) { + cn = "org.apache.tomcat.util.net.AcceptorEndpoint"; + } + if( "ms".equals( type )) { + cn = "org.apache.tomcat.util.net.MasterSlaveEndpoint"; + } + PoolTcpEndpoint res = null; + if( cn != null ) { + try { + Class c = Class.forName( cn ); + res = (PoolTcpEndpoint)c.newInstance(); + } catch( Throwable t ) { + throw new RuntimeException("Can't create endpoint " + cn); + } + } + if( res == null ) { + res = new SimpleEndpoint(); + } + res.type = type; + return res; } // -------------------- Configuration -------------------- + + public String getName() { + return name; + } + public void setName(String name) { + this.name = name; + } + + public void setMaxThreads(int maxThreads) { - if( maxThreads > 0) - tp.setMaxThreads(maxThreads); + this.maxThreads = maxThreads; } public int getMaxThreads() { - return tp.getMaxThreads(); + return maxThreads; } public void setMaxSpareThreads(int maxThreads) { - if(maxThreads > 0) - tp.setMaxSpareThreads(maxThreads); + this.maxSpareThreads = maxThreads; } public int getMaxSpareThreads() { - return tp.getMaxSpareThreads(); + return maxSpareThreads; } public void setMinSpareThreads(int minThreads) { - if(minThreads > 0) - tp.setMinSpareThreads(minThreads); + this.minSpareThreads = minThreads; } public int getMinSpareThreads() { - return tp.getMinSpareThreads(); + return minSpareThreads; } public void setThreadPriority(int threadPriority) { - tp.setThreadPriority(threadPriority); + this.threadPriority = threadPriority; } public int getThreadPriority() { - return tp.getThreadPriority(); + return threadPriority; } public int getPort() { @@ -173,15 +194,11 @@ public void setServerSocket(ServerSocket ss) { serverSocket = ss; } - - public void setServerSocketFactory( ServerSocketFactory factory ) { - this.factory=factory; + + public ServerSocket getServerSocket() { + return serverSocket; } - ServerSocketFactory getServerSocketFactory() { - return factory; - } - public void setConnectionHandler( TcpConnectionHandler handler ) { this.handler=handler; } @@ -257,19 +274,11 @@ } public String getStrategy() { - if (lf) { - return "lf"; - } else { - return "ms"; - } + return type; } public void setStrategy(String strategy) { - if ("ms".equals(strategy)) { - lf = false; - } else { - lf = true; - } + // shouldn't be used. } public int getCurrentThreadCount() { @@ -277,83 +286,40 @@ } public int getCurrentThreadsBusy() { - return curThreads - workerThreads.size(); + return curThreads; } // -------------------- Public methods -------------------- public void initEndpoint() throws IOException, InstantiationException { - try { - if(factory==null) - factory=ServerSocketFactory.getDefault(); - if(serverSocket==null) { - try { - if (inet == null) { - serverSocket = factory.createSocket(port, backlog); - } else { - serverSocket = factory.createSocket(port, backlog, inet); - } - } catch ( BindException be ) { - throw new BindException(be.getMessage() + ":" + port); - } - } - if( serverTimeout >= 0 ) - serverSocket.setSoTimeout( serverTimeout ); - } catch( IOException ex ) { - throw ex; - } catch( InstantiationException ex1 ) { - throw ex1; - } - initialized = true; } public void startEndpoint() throws IOException, InstantiationException { - if (!initialized) { - initEndpoint(); - } - if (lf) { - tp.start(); - } - running = true; - paused = false; - if (lf) { - listener = new LeaderFollowerWorkerThread(this); - tp.runIt(listener); - } else { - maxThreads = getMaxThreads(); - threadStart(); - } } public void pauseEndpoint() { - if (running && !paused) { - paused = true; - unlockAccept(); - } } public void resumeEndpoint() { - if (running) { - paused = false; - } } public void stopEndpoint() { - if (running) { - if (lf) { - tp.shutdown(); - } - running = false; - if (serverSocket != null) { - closeServerSocket(); - } - if (!lf) { - threadStop(); - } - initialized=false ; - } } + protected void processSocket(Socket s, TcpConnection con, + Object[] threadData) { + } + + + /** To notify worker done, recycle + */ + public void workerDone(Runnable r) { + + } + + + // ---------------- Utils ---------------------- + protected void closeServerSocket() { if (!paused) unlockAccept(); @@ -393,109 +359,7 @@ } } - // -------------------- Private methods - - Socket acceptSocket() { - if( !running || serverSocket==null ) return null; - - Socket accepted = null; - - try { - if(factory==null) { - accepted = serverSocket.accept(); - } else { - accepted = factory.acceptSocket(serverSocket); - } - if (null == accepted) { - log.warn(sm.getString("endpoint.warn.nullSocket")); - } else { - if (!running) { - accepted.close(); // rude, but unlikely! - accepted = null; - } else if (factory != null) { - factory.initSocket( accepted ); - } - } - } - catch(InterruptedIOException iioe) { - // normal part -- should happen regularly so - // that the endpoint can release if the server - // is shutdown. - } - catch (AccessControlException ace) { - // When using the Java SecurityManager this exception - // can be thrown if you are restricting access to the - // socket with SocketPermission's. - // Log the unauthorized access and continue - String msg = sm.getString("endpoint.warn.security", - serverSocket, ace); - log.warn(msg); - } - catch (IOException e) { - - String msg = null; - - if (running) { - msg = sm.getString("endpoint.err.nonfatal", - serverSocket, e); - log.error(msg, e); - } - - if (accepted != null) { - try { - accepted.close(); - } catch(Throwable ex) { - msg = sm.getString("endpoint.err.nonfatal", - accepted, ex); - log.warn(msg, ex); - } - accepted = null; - } - - if( ! running ) return null; - reinitializing = true; - // Restart endpoint when getting an IOException during accept - synchronized (threadSync) { - if (reinitializing) { - reinitializing = false; - // 1) Attempt to close server socket - closeServerSocket(); - initialized = false; - // 2) Reinit endpoint (recreate server socket) - try { - msg = sm.getString("endpoint.warn.reinit"); - log.warn(msg); - initEndpoint(); - } catch (Throwable t) { - msg = sm.getString("endpoint.err.nonfatal", - serverSocket, t); - log.error(msg, t); - } - // 3) If failed, attempt to restart endpoint - if (!initialized) { - msg = sm.getString("endpoint.warn.restart"); - log.warn(msg); - try { - stopEndpoint(); - initEndpoint(); - startEndpoint(); - } catch (Throwable t) { - msg = sm.getString("endpoint.err.fatal", - serverSocket, t); - log.error(msg, t); - } - // Current thread is now invalid: kill it - throw new ThreadDeath(); - } - } - } - - } - - return accepted; - } - - void setSocketOptions(Socket socket) + protected void setSocketOptions(Socket socket) throws SocketException { if(linger >= 0 ) socket.setSoLinger( true, linger); @@ -505,179 +369,46 @@ socket.setSoTimeout( socketTimeout ); } - - void processSocket(Socket s, TcpConnection con, Object[] threadData) { - // Process the connection - int step = 1; - try { - - // 1: Set socket options: timeout, linger, etc - setSocketOptions(s); - - // 2: SSL handshake - step = 2; - if (getServerSocketFactory() != null) { - getServerSocketFactory().handshake(s); - } - - // 3: Process the connection - step = 3; - con.setEndpoint(this); - con.setSocket(s); - getConnectionHandler().processConnection(con, threadData); - - } catch (SocketException se) { - log.error(sm.getString("endpoint.err.socket", s.getInetAddress()), - se); - // Try to close the socket - try { - s.close(); - } catch (IOException e) { - } - } catch (Throwable t) { - if (step == 2) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.err.handshake"), t); - } - } else { - log.error(sm.getString("endpoint.err.unexpected"), t); - } - // Try to close the socket - try { - s.close(); - } catch (IOException e) { - } - } finally { - if (con != null) { - con.recycle(); - } - } - } - - - // -------------------------------------------------- Master Slave Methods - - /** - * Create (or allocate) and return an available processor for use in - * processing a specific HTTP request, if possible. If the maximum - * allowed processors have already been created and are in use, return - * <code>null</code> instead. + * The background thread that listens for incoming TCP/IP connections and + * hands them off to an appropriate processor. */ - private MasterSlaveWorkerThread createWorkerThread() { - - synchronized (workerThreads) { - if (workerThreads.size() > 0) { - return ((MasterSlaveWorkerThread) workerThreads.pop()); - } - if ((maxThreads > 0) && (curThreads < maxThreads)) { - return (newWorkerThread()); - } else { - if (maxThreads < 0) { - return (newWorkerThread()); - } else { - return (null); - } - } - } - + public void run() { } - - /** - * Create and return a new processor suitable for processing HTTP - * requests and returning the corresponding responses. - */ - private MasterSlaveWorkerThread newWorkerThread() { - - MasterSlaveWorkerThread workerThread = - new MasterSlaveWorkerThread(this, tp.getName() + "-" + (++curThreads)); - workerThread.start(); - created.addElement(workerThread); - return (workerThread); - + public void setSSLSupport(boolean secure, String socketFactoryName) throws Exception { } - - /** - * Recycle the specified Processor so that it can be used again. - * - * @param processor The processor to be recycled - */ - void recycleWorkerThread(MasterSlaveWorkerThread workerThread) { - workerThreads.push(workerThread); + public void setDaemon(boolean b) { + daemon=b; } - - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - public void run() { - - // Loop until we receive a shutdown command - while (running) { - - // Loop if endpoint is paused - while (paused) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Ignore - } - } - - // Allocate a new worker thread - MasterSlaveWorkerThread workerThread = createWorkerThread(); - if (workerThread == null) { - try { - // Wait a little for load to go down: as a result, - // no accept will be made until the concurrency is - // lower than the specified maxThreads, and current - // connections will wait for a little bit instead of - // failing right away. - Thread.sleep(100); - } catch (InterruptedException e) { - // Ignore - } - continue; - } - - // Accept the next incoming connection from the server socket - Socket socket = acceptSocket(); - - // Hand this socket off to an appropriate processor - workerThread.assign(socket); - - // The processor will recycle itself when it finishes - - } - - // Notify the threadStop() method that we have shut ourselves down - synchronized (threadSync) { - threadSync.notifyAll(); - } - + public boolean getDaemon() { + return daemon; } - - /** - * Start the background processing thread. - */ - private void threadStart() { - thread = new Thread(this, tp.getName()); - thread.setPriority(getThreadPriority()); - thread.setDaemon(true); - thread.start(); + protected void threadStart(Thread t) { + for( int i=0; i<listeners.size(); i++ ) { + EndpointListener tpl=(EndpointListener)listeners.get(i); + tpl.threadStart(this, t); + } } - - /** - * Stop the background processing thread. - */ - private void threadStop() { - thread = null; + protected void threadEnd(Thread t) { + for( int i=0; i<listeners.size(); i++ ) { + EndpointListener tpl=(EndpointListener)listeners.get(i); + tpl.threadStart(this, t); + } + } + + public void addEndpointListener(EndpointListener listener) { + listeners.add(listener); } + + public static interface EndpointListener { + public void threadStart( PoolTcpEndpoint ep, Thread t); + public void threadEnd( PoolTcpEndpoint ep, Thread t); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]