Author: fhanik Date: Mon Aug 24 18:06:06 2009 New Revision: 807324 URL: http://svn.apache.org/viewvc?rev=807324&view=rev Log: Abstract out most commonly used properties
Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java?rev=807324&r1=807323&r2=807324&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/Http11NioProtocol.java Mon Aug 24 18:06:06 2009 @@ -261,15 +261,6 @@ ep.setExecutor(executor); } - /** - * NOOP. - * @param useexec - Ignored - * @deprecated Executors are always used for NIO - */ - public void setUseExecutor(boolean useexec) { - ep.setUseExecutor(useexec); - } - public int getMaxThreads() { return ep.getMaxThreads(); } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=807324&r1=807323&r2=807324&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Mon Aug 24 18:06:06 2009 @@ -16,7 +16,19 @@ */ package org.apache.tomcat.util.net; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.IntrospectionUtils; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.threads.ResizableExecutor; +import org.apache.tomcat.util.threads.TaskQueue; +import org.apache.tomcat.util.threads.TaskThreadFactory; +import org.apache.tomcat.util.threads.ThreadPoolExecutor; /** * * @author fhanik @@ -24,6 +36,7 @@ * @author Remy Maucherat */ public abstract class AbstractEndpoint { + protected static Log log = LogFactory.getLog(AbstractEndpoint.class); // -------------------------------------------------------------- Constants protected StringManager sm = StringManager.getManager("org.apache.tomcat.util.net.res"); @@ -54,5 +67,310 @@ * This one is a Tomcat extension to the Servlet spec. */ public static final String SESSION_MGR = "javax.servlet.request.ssl_session_mgr"; + + // ----------------------------------------------------------------- Fields + + + /** + * Running state of the endpoint. + */ + protected volatile boolean running = false; + + + /** + * Will be set to true whenever the endpoint is paused. + */ + protected volatile boolean paused = false; + + /** + * Track the initialization state of the endpoint. + */ + protected boolean initialized = false; + + /** + * Are we using an internal executor + */ + protected volatile boolean internalExecutor = false; + + /** + * Socket properties + */ + protected SocketProperties socketProperties = new SocketProperties(); + public SocketProperties getSocketProperties() { + return socketProperties; + } + + + + + + // ----------------------------------------------------------------- Properties + + /** + * External Executor based thread pool. + */ + private Executor executor = null; + public void setExecutor(Executor executor) { + this.executor = executor; + this.internalExecutor = (executor==null); + } + public Executor getExecutor() { return executor; } + + + /** + * Server socket port. + */ + private int port; + public int getPort() { return port; } + public void setPort(int port ) { this.port=port; } + + + /** + * Address for the server socket. + */ + private InetAddress address; + public InetAddress getAddress() { return address; } + public void setAddress(InetAddress address) { this.address = address; } + + /** + * Allows the server developer to specify the backlog that + * should be used for server sockets. By default, this value + * is 100. + */ + private int backlog = 100; + public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } + public int getBacklog() { return backlog; } + + /** + * Keepalive timeout, if lesser or equal to 0 then soTimeout will be used. + */ + private int keepAliveTimeout = 0; + public void setKeepAliveTimeout(int keepAliveTimeout) { this.keepAliveTimeout = keepAliveTimeout; } + public int getKeepAliveTimeout() { return keepAliveTimeout;} + + + /** + * Socket TCP no delay. + */ + public boolean getTcpNoDelay() { return socketProperties.getTcpNoDelay();} + public void setTcpNoDelay(boolean tcpNoDelay) { socketProperties.setTcpNoDelay(tcpNoDelay); } + + + /** + * Socket linger. + */ + public int getSoLinger() { return socketProperties.getSoLingerTime(); } + public void setSoLinger(int soLinger) { + socketProperties.setSoLingerTime(soLinger); + socketProperties.setSoLingerOn(soLinger>=0); + } + + + /** + * Socket timeout. + */ + public int getSoTimeout() { return socketProperties.getSoTimeout(); } + public void setSoTimeout(int soTimeout) { socketProperties.setSoTimeout(soTimeout); } + + /** + * SSL engine. + */ + private boolean SSLEnabled = false; + public boolean isSSLEnabled() { return SSLEnabled; } + public void setSSLEnabled(boolean SSLEnabled) { this.SSLEnabled = SSLEnabled; } + + + private int minSpareThreads = 10; + public int getMinSpareThreads() { + return Math.min(minSpareThreads,getMaxThreads()); + } + public void setMinSpareThreads(int minSpareThreads) { + this.minSpareThreads = minSpareThreads; + if (running && executor!=null) { + if (executor instanceof java.util.concurrent.ThreadPoolExecutor) { + ((java.util.concurrent.ThreadPoolExecutor)executor).setCorePoolSize(maxThreads); + } else if (executor instanceof ResizableExecutor) { + ((ResizableExecutor)executor).resizePool(minSpareThreads, maxThreads); + } + } + } + + /** + * Maximum amount of worker threads. + */ + private int maxThreads = 200; + public void setMaxThreads(int maxThreads) { + this.maxThreads = maxThreads; + if (running && executor!=null) { + if (executor instanceof java.util.concurrent.ThreadPoolExecutor) { + ((java.util.concurrent.ThreadPoolExecutor)executor).setMaximumPoolSize(maxThreads); + } else if (executor instanceof ResizableExecutor) { + ((ResizableExecutor)executor).resizePool(minSpareThreads, maxThreads); + } + } + } + public int getMaxThreads() { return maxThreads; } + + /** + * Max keep alive requests + */ + private int maxKeepAliveRequests=100; // as in Apache HTTPD server + public int getMaxKeepAliveRequests() { + return maxKeepAliveRequests; + } + public void setMaxKeepAliveRequests(int maxKeepAliveRequests) { + this.maxKeepAliveRequests = maxKeepAliveRequests; + } + + /** + * Name of the thread pool, which will be used for naming child threads. + */ + private String name = "TP"; + public void setName(String name) { this.name = name; } + public String getName() { return name; } + + /** + * The default is true - the created threads will be + * in daemon mode. If set to false, the control thread + * will not be daemon - and will keep the process alive. + */ + private boolean daemon = true; + public void setDaemon(boolean b) { daemon = b; } + public boolean getDaemon() { return daemon; } + + /** + * Priority of the worker threads. + */ + protected int threadPriority = Thread.NORM_PRIORITY; + public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } + public int getThreadPriority() { return threadPriority; } + + + + /** + * Generic properties, introspected + */ + public boolean setProperty(String name, String value) { + final String socketName = "socket."; + try { + if (name.startsWith(socketName)) { + return IntrospectionUtils.setProperty(socketProperties, name.substring(socketName.length()), value); + } else { + return IntrospectionUtils.setProperty(this,name,value); + } + }catch ( Exception x ) { + log.error("Unable to set attribute \""+name+"\" to \""+value+"\"",x); + return false; + } + } + + /** + * Return the amount of threads that are managed by the pool. + * + * @return the amount of threads that are managed by the pool + */ + public int getCurrentThreadCount() { + if (executor!=null) { + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getPoolSize(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getPoolSize(); + } else { + return -1; + } + } else { + return -2; + } + } + + /** + * Return the amount of threads that are in use + * + * @return the amount of threads that are in use + */ + public int getCurrentThreadsBusy() { + if (executor!=null) { + if (executor instanceof ThreadPoolExecutor) { + return ((ThreadPoolExecutor)executor).getActiveCount(); + } else if (executor instanceof ResizableExecutor) { + return ((ResizableExecutor)executor).getActiveCount(); + } else { + return -1; + } + } else { + return -2; + } + } + + public boolean isRunning() { + return running; + } + + public boolean isPaused() { + return paused; + } + + + public void createExecutor() { + internalExecutor = true; + TaskQueue taskqueue = new TaskQueue(); + TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); + executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); + taskqueue.setParent( (ThreadPoolExecutor) executor); + } + + public void shutdownExecutor() { + if ( executor!=null && internalExecutor ) { + if ( executor instanceof ThreadPoolExecutor ) { + //this is our internal one, so we need to shut it down + ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; + tpe.shutdownNow(); + TaskQueue queue = (TaskQueue) tpe.getQueue(); + queue.setParent(null); + } + executor = null; + } + } + + /** + * Unlock the server socket accept using a bogus connection. + */ + protected void unlockAccept() { + java.net.Socket s = null; + InetSocketAddress saddr = null; + try { + // Need to create a connection to unlock the accept(); + if (address == null) { + //TODO fix IPv6 + saddr = new InetSocketAddress("127.0.0.1", getPort()); + } else { + saddr = new InetSocketAddress(address,getPort()); + } + s = new java.net.Socket(); + s.setSoTimeout(getSocketProperties().getSoTimeout()); + s.setSoLinger(getSocketProperties().getSoLingerOn(),getSocketProperties().getSoLingerTime()); + if (log.isDebugEnabled()) { + log.debug("About to unlock socket for:"+saddr); + } + s.connect(saddr,getSocketProperties().getUnlockTimeout()); + if (log.isDebugEnabled()) { + log.debug("Socket unlock completed for:"+saddr); + } + } catch(Exception e) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("endpoint.debug.unlock", "" + getPort()), e); + } + } finally { + if (s != null) { + try { + s.close(); + } catch (Exception e) { + // Ignore + } + } + } + } + } + Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=807324&r1=807323&r2=807324&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Mon Aug 24 18:06:06 2009 @@ -68,68 +68,7 @@ protected static Log log = LogFactory.getLog(AprEndpoint.class); - - /** - * The Request attribute key for the cipher suite. - */ - public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite"; - - /** - * The Request attribute key for the key size. - */ - public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size"; - - /** - * The Request attribute key for the client certificate chain. - */ - public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate"; - - /** - * The Request attribute key for the session id. - * This one is a Tomcat extension to the Servlet spec. - */ - public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session"; - - // ----------------------------------------------------------------- Fields - - - /** - * Running state of the endpoint. - */ - protected volatile boolean running = false; - - - /** - * Will be set to true whenever the endpoint is paused. - */ - protected volatile boolean paused = false; - - - /** - * Track the initialization state of the endpoint. - */ - protected boolean initialized = false; - - - /** - * Current worker threads busy count. - */ - protected int curThreadsBusy = 0; - - - /** - * Current worker threads count. - */ - protected int curThreads = 0; - - - /** - * Sequence number used to generate thread names. - */ - protected int sequence = 0; - - /** * Root APR memory pool. */ @@ -154,10 +93,6 @@ protected long sslContext = 0; - /** - * Are we using an internal executor - */ - protected volatile boolean internalExecutor = false; // ------------------------------------------------------------- Properties @@ -170,27 +105,6 @@ /** - * External Executor based thread pool. - */ - protected Executor executor = null; - public void setExecutor(Executor executor) { this.executor = executor; } - public Executor getExecutor() { return executor; } - - - /** - * Maximum amount of worker threads. - */ - protected int maxThreads = 200; - public void setMaxThreads(int maxThreads) { - this.maxThreads = maxThreads; - if (running && executor instanceof ResizableExecutor) { - ((ResizableExecutor)executor).resizePool(getMinSpareThreads(), getMaxThreads()); - } - } - public int getMaxThreads() { return maxThreads; } - - - /** * Priority of the acceptor and poller threads. */ protected int threadPriority = Thread.NORM_PRIORITY; @@ -215,22 +129,6 @@ /** - * Server socket port. - */ - protected int port; - public int getPort() { return port; } - public void setPort(int port ) { this.port=port; } - - - /** - * Address for the server socket. - */ - protected InetAddress address; - public InetAddress getAddress() { return address; } - public void setAddress(InetAddress address) { this.address = address; } - - - /** * Handling of accepted sockets. */ protected Handler handler = null; @@ -239,48 +137,6 @@ /** - * Allows the server developer to specify the backlog that - * should be used for server sockets. By default, this value - * is 100. - */ - protected int backlog = 100; - public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } - public int getBacklog() { return backlog; } - - - /** - * Socket TCP no delay. - */ - protected boolean tcpNoDelay = false; - public boolean getTcpNoDelay() { return tcpNoDelay; } - public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } - - - /** - * Socket linger. - */ - protected int soLinger = 100; - public int getSoLinger() { return soLinger; } - public void setSoLinger(int soLinger) { this.soLinger = soLinger; } - - - /** - * Socket timeout. - */ - protected int soTimeout = -1; - public int getSoTimeout() { return soTimeout; } - public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; } - - - /** - * Keep-Alive timeout. - */ - protected int keepAliveTimeout = -1; - public int getKeepAliveTimeout() { return keepAliveTimeout; } - public void setKeepAliveTimeout(int keepAliveTimeout) { this.keepAliveTimeout = keepAliveTimeout; } - - - /** * Poll interval, in microseconds. The smaller the value, the more CPU the poller * will use, but the more responsive to activity it will be. */ @@ -290,24 +146,6 @@ /** - * The default is true - the created threads will be - * in daemon mode. If set to false, the control thread - * will not be daemon - and will keep the process alive. - */ - protected boolean daemon = true; - public void setDaemon(boolean b) { daemon = b; } - public boolean getDaemon() { return daemon; } - - - /** - * Name of the thread pool, which will be used for naming child threads. - */ - protected String name = "TP"; - public void setName(String name) { this.name = name; } - public String getName() { return name; } - - - /** * Use endfile for sending static files. */ protected boolean useSendfile = Library.APR_HAS_SENDFILE; @@ -381,26 +219,6 @@ /** - * Dummy maxSpareThreads property. - */ - public int getMaxSpareThreads() { return 0; } - - - /** - * Dummy minSpareThreads property. - */ - public int getMinSpareThreads() { return 0; } - - - /** - * SSL engine. - */ - protected boolean SSLEnabled = false; - public boolean isSSLEnabled() { return SSLEnabled; } - public void setSSLEnabled(boolean SSLEnabled) { this.SSLEnabled = SSLEnabled; } - - - /** * SSL protocols. */ protected String SSLProtocol = "all"; @@ -531,63 +349,6 @@ } } - /** - * Return the amount of threads that are managed by the pool. - * - * @return the amount of threads that are managed by the pool - */ - public int getCurrentThreadCount() { - if (executor!=null) { - if (executor instanceof ThreadPoolExecutor) { - return ((ThreadPoolExecutor)executor).getPoolSize(); - } else if (executor instanceof ResizableExecutor) { - return ((ResizableExecutor)executor).getPoolSize(); - } else { - return -1; - } - } else { - return -2; - } - } - - /** - * Return the amount of threads that are in use - * - * @return the amount of threads that are in use - */ - public int getCurrentThreadsBusy() { - if (executor!=null) { - if (executor instanceof ThreadPoolExecutor) { - return ((ThreadPoolExecutor)executor).getActiveCount(); - } else if (executor instanceof ResizableExecutor) { - return ((ResizableExecutor)executor).getActiveCount(); - } else { - return -1; - } - } else { - return -2; - } - } - - /** - * Return the state of the endpoint. - * - * @return true if the endpoint is running, false otherwise - */ - public boolean isRunning() { - return running; - } - - - /** - * Return the state of the endpoint. - * - * @return true if the endpoint is paused, false otherwise - */ - public boolean isPaused() { - return paused; - } - // ----------------------------------------------- Public Lifecycle Methods @@ -607,8 +368,8 @@ serverSockPool = Pool.create(rootPool); // Create the APR address that will be bound String addressStr = null; - if (address != null) { - addressStr = address.getHostAddress(); + if (getAddress() != null) { + addressStr = getAddress().getHostAddress(); } int family = Socket.APR_INET; if (Library.APR_HAVE_IPV6) { @@ -621,7 +382,7 @@ } long inetAddress = Address.info(addressStr, family, - port, 0, rootPool); + getPort(), 0, rootPool); // Create the APR server socket serverSock = Socket.create(Address.getInfo(inetAddress).family, Socket.SOCK_STREAM, @@ -637,7 +398,7 @@ throw new Exception(sm.getString("endpoint.init.bind", "" + ret, Error.strerror(ret))); } // Start listening on the server socket - ret = Socket.listen(serverSock, backlog); + ret = Socket.listen(serverSock, getBacklog()); if (ret != 0) { throw new Exception(sm.getString("endpoint.init.listen", "" + ret, Error.strerror(ret))); } @@ -690,7 +451,7 @@ } // Initialize SSL if needed - if (SSLEnabled) { + if (isSSLEnabled()) { // SSL protocol int value = SSL.SSL_PROTOCOL_ALL; @@ -748,12 +509,8 @@ paused = false; // Create worker collection - if (executor == null) { - internalExecutor = true; - TaskQueue taskqueue = new TaskQueue(); - TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); - executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); - taskqueue.setParent( (ThreadPoolExecutor) executor); + if (getExecutor() == null) { + createExecutor(); } // Start poller threads @@ -795,7 +552,7 @@ for (int i = 0; i < acceptorThreadCount; i++) { Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); acceptorThread.setPriority(threadPriority); - acceptorThread.setDaemon(daemon); + acceptorThread.setDaemon(getDaemon()); acceptorThread.start(); } @@ -847,16 +604,7 @@ sendfiles = null; } } - if ( executor!=null && internalExecutor ) { - if ( executor instanceof ThreadPoolExecutor ) { - //this is our internal one, so we need to shut it down - ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; - tpe.shutdownNow(); - TaskQueue queue = (TaskQueue) tpe.getQueue(); - queue.setParent(null); - } - executor = null; - } + shutdownExecutor(); } @@ -883,44 +631,6 @@ // ------------------------------------------------------ Protected Methods - /** - * Get a sequence number used for thread naming. - */ - protected int getSequence() { - return sequence++; - } - - - /** - * Unlock the server socket accept using a bugus connection. - */ - protected void unlockAccept() { - java.net.Socket s = null; - try { - // Need to create a connection to unlock the accept(); - if (address == null) { - s = new java.net.Socket("127.0.0.1", port); - } else { - s = new java.net.Socket(address, port); - // setting soLinger to a small value will help shutdown the - // connection quicker - s.setSoLinger(true, 0); - } - } catch(Exception e) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); - } - } finally { - if (s != null) { - try { - s.close(); - } catch (Exception e) { - // Ignore - } - } - } - } - /** * Process the specified connection. @@ -931,12 +641,12 @@ try { // 1: Set socket options: timeout, linger, etc - if (soLinger >= 0) - Socket.optSet(socket, Socket.APR_SO_LINGER, soLinger); - if (tcpNoDelay) - Socket.optSet(socket, Socket.APR_TCP_NODELAY, (tcpNoDelay ? 1 : 0)); - if (soTimeout > 0) - Socket.timeoutSet(socket, soTimeout * 1000); + if (socketProperties.getSoLingerOn() && socketProperties.getSoLingerTime() >= 0) + Socket.optSet(socket, Socket.APR_SO_LINGER, socketProperties.getSoLingerTime()); + if (socketProperties.getTcpNoDelay()) + Socket.optSet(socket, Socket.APR_TCP_NODELAY, (socketProperties.getTcpNoDelay() ? 1 : 0)); + if (socketProperties.getSoTimeout() > 0) + Socket.timeoutSet(socket, socketProperties.getSoTimeout() * 1000); // 2: SSL handshake step = 2; @@ -989,7 +699,7 @@ */ protected boolean processSocketWithOptions(long socket) { try { - executor.execute(new SocketWithOptionsProcessor(socket)); + getExecutor().execute(new SocketWithOptionsProcessor(socket)); } catch (RejectedExecutionException x) { log.warn("Socket processing request was rejected for:"+socket,x); return false; @@ -1008,7 +718,7 @@ */ protected boolean processSocket(long socket) { try { - executor.execute(new SocketProcessor(socket)); + getExecutor().execute(new SocketProcessor(socket)); } catch (RejectedExecutionException x) { log.warn("Socket processing request was rejected for:"+socket,x); return false; @@ -1027,7 +737,7 @@ */ protected boolean processSocket(long socket, SocketStatus status) { try { - executor.execute(new SocketEventProcessor(socket, status)); + getExecutor().execute(new SocketEventProcessor(socket, status)); } catch (RejectedExecutionException x) { log.warn("Socket processing request was rejected for:"+socket,x); return false; @@ -1120,9 +830,9 @@ protected void init() { pool = Pool.create(serverSockPool); int size = pollerSize / pollerThreadCount; - int timeout = keepAliveTimeout; + int timeout = getKeepAliveTimeout(); if (timeout < 0) { - timeout = soTimeout; + timeout = socketProperties.getSoTimeout(); } serverPollset = allocatePoller(size, pool, timeout); if (serverPollset == 0 && size > 1024) { @@ -1292,7 +1002,7 @@ continue; } } - if (soTimeout > 0 && maintainTime > 1000000L && running) { + if (socketProperties.getSoTimeout() > 0 && maintainTime > 1000000L && running) { rv = Poll.maintain(serverPollset, desc, true); maintainTime = 0; if (rv > 0) { @@ -1375,14 +1085,14 @@ protected void init() { pool = Pool.create(serverSockPool); int size = sendfileSize / sendfileThreadCount; - sendfilePollset = allocatePoller(size, pool, soTimeout); + sendfilePollset = allocatePoller(size, pool, socketProperties.getSoTimeout()); if (sendfilePollset == 0 && size > 1024) { size = 1024; - sendfilePollset = allocatePoller(size, pool, soTimeout); + sendfilePollset = allocatePoller(size, pool, socketProperties.getSoTimeout()); } if (sendfilePollset == 0) { size = 62; - sendfilePollset = allocatePoller(size, pool, soTimeout); + sendfilePollset = allocatePoller(size, pool, socketProperties.getSoTimeout()); } desc = new long[size * 2]; sendfileData = new HashMap<Long, SendfileData>(size); @@ -1458,7 +1168,7 @@ // Entire file has been sent Pool.destroy(data.fdpool); // Set back socket to blocking mode - Socket.timeoutSet(data.socket, soTimeout * 1000); + Socket.timeoutSet(data.socket, socketProperties.getSoTimeout() * 1000); return true; } } @@ -1577,7 +1287,7 @@ if (state.keepAlive) { // Destroy file descriptor pool, which should close the file Pool.destroy(state.fdpool); - Socket.timeoutSet(state.socket, soTimeout * 1000); + Socket.timeoutSet(state.socket, socketProperties.getSoTimeout() * 1000); // If all done put the socket back in the poller for // processing of further requests getPoller().add(state.socket); @@ -1605,7 +1315,7 @@ } } // Call maintain for the sendfile poller - if (soTimeout > 0 && maintainTime > 1000000L && running) { + if (socketProperties.getSoTimeout() > 0 && maintainTime > 1000000L && running) { rv = Poll.maintain(sendfilePollset, desc, true); maintainTime = 0; if (rv > 0) { Modified: tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java?rev=807324&r1=807323&r2=807324&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java Mon Aug 24 18:06:06 2009 @@ -60,57 +60,11 @@ // ----------------------------------------------------------------- Fields - - /** - * Running state of the endpoint. - */ - protected volatile boolean running = false; - - - /** - * Will be set to true whenever the endpoint is paused. - */ - protected volatile boolean paused = false; - - - /** - * Track the initialization state of the endpoint. - */ - protected boolean initialized = false; - - - /** - * Current worker threads busy count. - */ - protected int curThreadsBusy = 0; - - - /** - * Current worker threads count. - */ - protected int curThreads = 0; - - - /** - * Sequence number used to generate thread names. - */ - protected int sequence = 0; - - /** * Associated server socket. */ protected ServerSocket serverSocket = null; - /** - * Holds all the socket properties - */ - protected SocketProperties socketProperties = new SocketProperties(); - - /** - * Are we using an internal executor - */ - protected volatile boolean internalExecutor = false; // ------------------------------------------------------------- Properties @@ -138,36 +92,6 @@ public void setAcceptorThreadCount(int acceptorThreadCount) { this.acceptorThreadCount = acceptorThreadCount; } public int getAcceptorThreadCount() { return acceptorThreadCount; } - - /** - * External Executor based thread pool. - */ - protected Executor executor = null; - public void setExecutor(Executor executor) { this.executor = executor; } - public Executor getExecutor() { return executor; } - - - /** - * Maximum amount of worker threads. - */ - protected int maxThreads = 200; - public void setMaxThreads(int maxThreads) { - this.maxThreads = maxThreads; - if (running) { - //TODO Dynamic resize - log.error("Resizing executor dynamically is not possible at this time."); - } - } - public int getMaxThreads() { return maxThreads; } - - public int minSpareThreads = 10; - public int getMinSpareThreads() { - return Math.min(minSpareThreads,getMaxThreads()); - } - public void setMinSpareThreads(int minSpareThreads) { - this.minSpareThreads = minSpareThreads; - } - /** * Priority of the acceptor and poller threads. */ @@ -177,93 +101,12 @@ /** - * Server socket port. - */ - protected int port; - public int getPort() { return port; } - public void setPort(int port ) { this.port=port; } - - - /** - * Address for the server socket. - */ - protected InetAddress address; - public InetAddress getAddress() { return address; } - public void setAddress(InetAddress address) { this.address = address; } - - - /** * Handling of accepted sockets. */ protected Handler handler = null; public void setHandler(Handler handler ) { this.handler = handler; } public Handler getHandler() { return handler; } - - /** - * Allows the server developer to specify the backlog that - * should be used for server sockets. By default, this value - * is 100. - */ - protected int backlog = 100; - public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } - public int getBacklog() { return backlog; } - - - /** - * Socket TCP no delay. - */ - public boolean getTcpNoDelay() { return socketProperties.getTcpNoDelay(); } - public void setTcpNoDelay(boolean tcpNoDelay) { socketProperties.setTcpNoDelay(tcpNoDelay); } - - - /** - * Socket linger. - */ - public int getSoLinger() {return socketProperties.getSoLingerTime();} - public void setSoLinger(int soLinger) { - if (soLinger>=0) { - socketProperties.setSoLingerOn(true); - socketProperties.setSoLingerTime(soLinger); - } else { - socketProperties.setSoLingerOn(false); - socketProperties.setSoLingerTime(-1); - } - } - - - /** - * Socket timeout. - */ - public int getSoTimeout() { return socketProperties.getSoTimeout(); } - public void setSoTimeout(int soTimeout) { - // APR/native uses -1 for infinite - Java uses 0 - if (soTimeout == -1) { - socketProperties.setSoTimeout(0); - } else { - socketProperties.setSoTimeout(soTimeout); - } - } - - - /** - * The default is true - the created threads will be - * in daemon mode. If set to false, the control thread - * will not be daemon - and will keep the process alive. - */ - protected boolean daemon = true; - public void setDaemon(boolean b) { daemon = b; } - public boolean getDaemon() { return daemon; } - - - /** - * Name of the thread pool, which will be used for naming child threads. - */ - protected String name = "TP"; - public void setName(String name) { this.name = name; } - public String getName() { return name; } - - /** * Server socket factory. */ @@ -272,53 +115,7 @@ public ServerSocketFactory getServerSocketFactory() { return serverSocketFactory; } - public boolean isRunning() { - return running; - } - - public boolean isPaused() { - return paused; - } - - /** - * Return the amount of threads that are managed by the pool. - * - * @return the amount of threads that are managed by the pool - */ - public int getCurrentThreadCount() { - if (executor!=null) { - if (executor instanceof ThreadPoolExecutor) { - return ((ThreadPoolExecutor)executor).getPoolSize(); - } else if (executor instanceof ResizableExecutor) { - return ((ResizableExecutor)executor).getPoolSize(); - } else { - return -1; - } - } else { - return -2; - } - } - - /** - * Return the amount of threads that are in use - * - * @return the amount of threads that are in use - */ - public int getCurrentThreadsBusy() { - if (executor!=null) { - if (executor instanceof ThreadPoolExecutor) { - return ((ThreadPoolExecutor)executor).getActiveCount(); - } else if (executor instanceof ResizableExecutor) { - return ((ResizableExecutor)executor).getActiveCount(); - } else { - return -1; - } - } else { - return -2; - } - } - // ------------------------------------------------ Handler Inner Interface @@ -439,17 +236,17 @@ } if (serverSocket == null) { try { - if (address == null) { - serverSocket = serverSocketFactory.createSocket(port, backlog); + if (getAddress() == null) { + serverSocket = serverSocketFactory.createSocket(getPort(), getBacklog()); } else { - serverSocket = serverSocketFactory.createSocket(port, backlog, address); + serverSocket = serverSocketFactory.createSocket(getPort(), getBacklog(), getAddress()); } } catch (BindException be) { - if (address == null) - throw new BindException(be.getMessage() + "<null>:" + port); + if (getAddress() == null) + throw new BindException(be.getMessage() + "<null>:" + getPort()); else throw new BindException(be.getMessage() + " " + - address.toString() + ":" + port); + getAddress().toString() + ":" + getPort()); } } //if( serverTimeout >= 0 ) @@ -470,19 +267,15 @@ paused = false; // Create worker collection - if (executor == null) { - internalExecutor = true; - TaskQueue taskqueue = new TaskQueue(); - TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); - executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); - taskqueue.setParent( (ThreadPoolExecutor) executor); + if (getExecutor() == null) { + createExecutor(); } // Start acceptor threads for (int i = 0; i < acceptorThreadCount; i++) { Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); acceptorThread.setPriority(threadPriority); - acceptorThread.setDaemon(daemon); + acceptorThread.setDaemon(getDaemon()); acceptorThread.start(); } } @@ -506,16 +299,7 @@ running = false; unlockAccept(); } - if ( executor!=null && internalExecutor ) { - if ( executor instanceof ThreadPoolExecutor ) { - //this is our internal one, so we need to shut it down - ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; - tpe.shutdownNow(); - TaskQueue queue = (TaskQueue) tpe.getQueue(); - queue.setParent(null); - } - executor = null; - } + shutdownExecutor(); } /** @@ -537,36 +321,7 @@ initialized = false ; } - - /** - * Unlock the accept by using a local connection. - */ - protected void unlockAccept() { - Socket s = null; - try { - // Need to create a connection to unlock the accept(); - if (address == null) { - s = new Socket("127.0.0.1", port); - } else { - s = new Socket(address, port); - // setting soLinger to a small value will help shutdown the - // connection quicker - s.setSoLinger(true, 0); - } - } catch (Exception e) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); - } - } finally { - if (s != null) { - try { - s.close(); - } catch (Exception e) { - // Ignore - } - } - } - } + /** @@ -602,7 +357,7 @@ */ protected boolean processSocket(Socket socket) { try { - executor.execute(new SocketProcessor(socket)); + getExecutor().execute(new SocketProcessor(socket)); } catch (RejectedExecutionException x) { log.warn("Socket processing request was rejected for:"+socket,x); return false; Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=807324&r1=807323&r2=807324&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon Aug 24 18:06:06 2009 @@ -92,24 +92,6 @@ // ----------------------------------------------------------------- Fields - - /** - * Running state of the endpoint. - */ - protected volatile boolean running = false; - - - /** - * Will be set to true whenever the endpoint is paused. - */ - protected volatile boolean paused = false; - - - /** - * Track the initialization state of the endpoint. - */ - protected boolean initialized = false; - protected NioSelectorPool selectorPool = new NioSelectorPool(); /** @@ -299,60 +281,27 @@ /** - * External Executor based thread pool. - */ - protected Executor executor = null; - public void setExecutor(Executor executor) { - this.executor = executor; - this.internalExecutor = (executor==null); - } - public Executor getExecutor() { return executor; } - /** - * Are we using an internal executor - */ - protected volatile boolean internalExecutor = false; - - protected boolean useExecutor = true; - /** - * @deprecated Executor is always used - * @param useexec - */ - public void setUseExecutor(boolean useexec) { log.info("Setting useExecutor is deprecated. Executors are always used.");} - public boolean getUseExecutor() { return useExecutor || (executor!=null);} - - /** - * Maximum amount of worker threads. + * Generic properties, introspected */ - protected int maxThreads = 200; - public void setMaxThreads(int maxThreads) { - this.maxThreads = maxThreads; - if (running && executor!=null && executor instanceof java.util.concurrent.ThreadPoolExecutor) { - ((java.util.concurrent.ThreadPoolExecutor)executor).setMaximumPoolSize(maxThreads); + @Override + public boolean setProperty(String name, String value) { + final String selectorPoolName = "selectorPool."; + final String socketName = "socket."; + try { + if (name.startsWith(selectorPoolName)) { + return IntrospectionUtils.setProperty(selectorPool, name.substring(selectorPoolName.length()), value); + } else { + return super.setProperty(name, value); + } + }catch ( Exception x ) { + log.error("Unable to set attribute \""+name+"\" to \""+value+"\"",x); + return false; } } - public int getMaxThreads() { return maxThreads; } - - /** - * Max keep alive requests - */ - protected int maxKeepAliveRequests=100; // as in Apache HTTPD server - public int getMaxKeepAliveRequests() { - return maxKeepAliveRequests; - } - public void setMaxKeepAliveRequests(int maxKeepAliveRequests) { - this.maxKeepAliveRequests = maxKeepAliveRequests; - } /** - * Priority of the worker threads. - */ - protected int threadPriority = Thread.NORM_PRIORITY; - public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; } - public int getThreadPriority() { return threadPriority; } - - /** * Priority of the acceptor threads. */ protected int acceptorThreadPriority = Thread.NORM_PRIORITY; @@ -366,20 +315,6 @@ public void setPollerThreadPriority(int pollerThreadPriority) { this.pollerThreadPriority = pollerThreadPriority; } public int getPollerThreadPriority() { return pollerThreadPriority; } - /** - * Server socket port. - */ - protected int port; - public int getPort() { return port; } - public void setPort(int port ) { this.port=port; } - - - /** - * Address for the server socket. - */ - protected InetAddress address; - public InetAddress getAddress() { return address; } - public void setAddress(InetAddress address) { this.address = address; } /** @@ -390,65 +325,8 @@ public Handler getHandler() { return handler; } - /** - * Allows the server developer to specify the backlog that - * should be used for server sockets. By default, this value - * is 100. - */ - protected int backlog = 100; - public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; } - public int getBacklog() { return backlog; } - - /** - * Keepalive timeout, if lesser or equal to 0 then soTimeout will be used. - */ - protected int keepAliveTimeout = 0; - public void setKeepAliveTimeout(int keepAliveTimeout) { this.keepAliveTimeout = keepAliveTimeout; } - public int getKeepAliveTimeout() { return keepAliveTimeout;} - protected SocketProperties socketProperties = new SocketProperties(); - - /** - * Socket TCP no delay. - */ - public boolean getTcpNoDelay() { return socketProperties.getTcpNoDelay();} - public void setTcpNoDelay(boolean tcpNoDelay) { socketProperties.setTcpNoDelay(tcpNoDelay); } - - - /** - * Socket linger. - */ - public int getSoLinger() { return socketProperties.getSoLingerTime(); } - public void setSoLinger(int soLinger) { - socketProperties.setSoLingerTime(soLinger); - socketProperties.setSoLingerOn(soLinger>=0); - } - - - /** - * Socket timeout. - */ - public int getSoTimeout() { return socketProperties.getSoTimeout(); } - public void setSoTimeout(int soTimeout) { socketProperties.setSoTimeout(soTimeout); } - - /** - * The default is true - the created threads will be - * in daemon mode. If set to false, the control thread - * will not be daemon - and will keep the process alive. - */ - protected boolean daemon = true; - public void setDaemon(boolean b) { daemon = b; } - public boolean getDaemon() { return daemon; } - - - /** - * Name of the thread pool, which will be used for naming child threads. - */ - protected String name = "TP"; - public void setName(String name) { this.name = name; } - public String getName() { return name; } - /** @@ -492,39 +370,6 @@ return pollers[idx]; } - /** - * Dummy maxSpareThreads property. - */ - public int getMaxSpareThreads() { return Math.min(getMaxThreads(),getMinSpareThreads()); } - - - public int minSpareThreads = 10; - public int getMinSpareThreads() { - return Math.min(minSpareThreads,getMaxThreads()); - } - public void setMinSpareThreads(int minSpareThreads) { - this.minSpareThreads = minSpareThreads; - } - - /** - * Generic properties, introspected - */ - public boolean setProperty(String name, String value) { - final String selectorPoolName = "selectorPool."; - final String socketName = "socket."; - try { - if (name.startsWith(selectorPoolName)) { - return IntrospectionUtils.setProperty(selectorPool, name.substring(selectorPoolName.length()), value); - } else if (name.startsWith(socketName)) { - return IntrospectionUtils.setProperty(socketProperties, name.substring(socketName.length()), value); - } else { - return IntrospectionUtils.setProperty(this,name,value); - } - }catch ( Exception x ) { - log.error("Unable to set attribute \""+name+"\" to \""+value+"\"",x); - return false; - } - } public String adjustRelativePath(String path, String relativeTo) { @@ -704,43 +549,6 @@ - /** - * Return the amount of threads that are managed by the pool. - * - * @return the amount of threads that are managed by the pool - */ - public int getCurrentThreadCount() { - if (executor!=null) { - if (executor instanceof ThreadPoolExecutor) { - return ((ThreadPoolExecutor)executor).getPoolSize(); - } else if (executor instanceof ResizableExecutor) { - return ((ResizableExecutor)executor).getPoolSize(); - } else { - return -1; - } - } else { - return -2; - } - } - - /** - * Return the amount of threads that are in use - * - * @return the amount of threads that are in use - */ - public int getCurrentThreadsBusy() { - if (executor!=null) { - if (executor instanceof ThreadPoolExecutor) { - return ((ThreadPoolExecutor)executor).getActiveCount(); - } else if (executor instanceof ResizableExecutor) { - return ((ResizableExecutor)executor).getActiveCount(); - } else { - return -1; - } - } else { - return -2; - } - } /** * Return the state of the endpoint. @@ -776,8 +584,8 @@ serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); - InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port)); - serverSock.socket().bind(addr,backlog); + InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort())); + serverSock.socket().bind(addr,getBacklog()); serverSock.configureBlocking(true); //mimic APR behavior serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout()); @@ -860,12 +668,8 @@ paused = false; // Create worker collection - if ( executor == null ) { - internalExecutor = true; - TaskQueue taskqueue = new TaskQueue(); - TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); - executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); - taskqueue.setParent( (ThreadPoolExecutor) executor); + if ( getExecutor() == null ) { + createExecutor(); } // Start poller threads @@ -882,7 +686,7 @@ for (int i = 0; i < acceptorThreadCount; i++) { Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i); acceptorThread.setPriority(threadPriority); - acceptorThread.setDaemon(daemon); + acceptorThread.setDaemon(getDaemon()); acceptorThread.start(); } } @@ -929,16 +733,7 @@ keyCache.clear(); nioChannels.clear(); processorCache.clear(); - if ( executor!=null && internalExecutor ) { - if ( executor instanceof ThreadPoolExecutor ) { - //this is our internal one, so we need to shut it down - ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; - tpe.shutdownNow(); - TaskQueue queue = (TaskQueue) tpe.getQueue(); - queue.setParent(null); - } - executor = null; - } + shutdownExecutor(); } @@ -948,7 +743,7 @@ */ public void destroy() throws Exception { if (log.isDebugEnabled()) { - log.debug("Destroy initiated for "+new InetSocketAddress(address,port)); + log.debug("Destroy initiated for "+new InetSocketAddress(getAddress(),getPort())); } if (running) { stop(); @@ -962,7 +757,7 @@ releaseCaches(); selectorPool.close(); if (log.isDebugEnabled()) { - log.debug("Destroy completed for "+new InetSocketAddress(address,port)); + log.debug("Destroy completed for "+new InetSocketAddress(getAddress(),getPort())); } } @@ -982,10 +777,6 @@ return selectorPool; } - public SocketProperties getSocketProperties() { - return socketProperties; - } - public boolean getUseSendfile() { return useSendfile; } @@ -999,43 +790,7 @@ } - /** - * Unlock the server socket accept using a bogus connection. - */ - protected void unlockAccept() { - java.net.Socket s = null; - InetSocketAddress saddr = null; - try { - // Need to create a connection to unlock the accept(); - if (address == null) { - saddr = new InetSocketAddress("127.0.0.1", port); - } else { - saddr = new InetSocketAddress(address,port); - } - s = new java.net.Socket(); - s.setSoTimeout(getSocketProperties().getSoTimeout()); - s.setSoLinger(getSocketProperties().getSoLingerOn(),getSocketProperties().getSoLingerTime()); - if (log.isDebugEnabled()) { - log.debug("About to unlock socket for:"+saddr); - } - s.connect(saddr,getSocketProperties().getUnlockTimeout()); - if (log.isDebugEnabled()) { - log.debug("Socket unlock completed for:"+saddr); - } - } catch(Exception e) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.debug.unlock", "" + port), e); - } - } finally { - if (s != null) { - try { - s.close(); - } catch (Exception e) { - // Ignore - } - } - } - } + /** @@ -1110,10 +865,7 @@ * @return boolean */ protected boolean isWorkerAvailable() { - if ( executor != null ) { - return true; - } - return false; + return true; } public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { @@ -1123,7 +875,7 @@ SocketProcessor sc = processorCache.poll(); if ( sc == null ) sc = new SocketProcessor(socket,status); else sc.reset(socket,status); - if ( dispatch && executor!=null ) executor.execute(sc); + if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc); else sc.run(); } catch (RejectedExecutionException rx) { log.warn("Socket processing request was rejected for:"+socket,rx); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org