Author: markt Date: Wed Dec 7 09:28:40 2016 New Revision: 1773036 URL: http://svn.apache.org/viewvc?rev=1773036&view=rev Log: Refactor the per Endpoint Acceptors into a single Acceptor class.
Added: tomcat/trunk/java/org/apache/tomcat/util/net/Acceptor.java (with props) Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11JsseProtocol.java tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/AbstractJsseEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java tomcat/trunk/webapps/docs/changelog.xml Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java Wed Dec 7 09:28:40 2016 @@ -46,7 +46,7 @@ public abstract class AbstractProcessor protected Adapter adapter; protected final AsyncStateMachine asyncStateMachine; private volatile long asyncTimeout = -1; - protected final AbstractEndpoint<?> endpoint; + protected final AbstractEndpoint<?,?> endpoint; protected final Request request; protected final Response response; protected volatile SocketWrapperBase<?> socketWrapper = null; @@ -59,12 +59,12 @@ public abstract class AbstractProcessor private ErrorState errorState = ErrorState.NONE; - public AbstractProcessor(AbstractEndpoint<?> endpoint) { + public AbstractProcessor(AbstractEndpoint<?,?> endpoint) { this(endpoint, new Request(), new Response()); } - protected AbstractProcessor(AbstractEndpoint<?> endpoint, Request coyoteRequest, + protected AbstractProcessor(AbstractEndpoint<?,?> endpoint, Request coyoteRequest, Response coyoteResponse) { this.endpoint = endpoint; asyncStateMachine = new AsyncStateMachine(this); Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Wed Dec 7 09:28:40 2016 @@ -87,7 +87,7 @@ public abstract class AbstractProtocol<S * ProtocolHandler implementation (ProtocolHandler using NIO, requires NIO * Endpoint etc.). */ - private final AbstractEndpoint<S> endpoint; + private final AbstractEndpoint<S,?> endpoint; private Handler<S> handler; @@ -103,7 +103,7 @@ public abstract class AbstractProtocol<S private AsyncTimeout asyncTimeout = null; - public AbstractProtocol(AbstractEndpoint<S> endpoint) { + public AbstractProtocol(AbstractEndpoint<S,?> endpoint) { this.endpoint = endpoint; setConnectionLinger(Constants.DEFAULT_CONNECTION_LINGER); setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); @@ -364,7 +364,7 @@ public abstract class AbstractProtocol<S // ----------------------------------------------- Accessors for sub-classes - protected AbstractEndpoint<S> getEndpoint() { + protected AbstractEndpoint<S,?> getEndpoint() { return endpoint; } Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProtocol.java Wed Dec 7 09:28:40 2016 @@ -41,7 +41,7 @@ public abstract class AbstractAjpProtoco protected static final StringManager sm = StringManager.getManager(AbstractAjpProtocol.class); - public AbstractAjpProtocol(AbstractEndpoint<S> endpoint) { + public AbstractAjpProtocol(AbstractEndpoint<S,?> endpoint) { super(endpoint); setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); // AJP does not use Send File @@ -64,7 +64,7 @@ public abstract class AbstractAjpProtoco * Overridden to make getter accessible to other classes in this package. */ @Override - protected AbstractEndpoint<S> getEndpoint() { + protected AbstractEndpoint<S,?> getEndpoint() { return super.getEndpoint(); } Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Wed Dec 7 09:28:40 2016 @@ -243,7 +243,7 @@ public class AjpProcessor extends Abstra // ------------------------------------------------------------ Constructor - public AjpProcessor(int packetSize, AbstractEndpoint<?> endpoint) { + public AjpProcessor(int packetSize, AbstractEndpoint<?,?> endpoint) { super(endpoint); Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11JsseProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11JsseProtocol.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11JsseProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11JsseProtocol.java Wed Dec 7 09:28:40 2016 @@ -22,15 +22,15 @@ import org.apache.tomcat.util.net.openss public abstract class AbstractHttp11JsseProtocol<S> extends AbstractHttp11Protocol<S> { - public AbstractHttp11JsseProtocol(AbstractJsseEndpoint<S> endpoint) { + public AbstractHttp11JsseProtocol(AbstractJsseEndpoint<S,?> endpoint) { super(endpoint); } @Override - protected AbstractJsseEndpoint<S> getEndpoint() { + protected AbstractJsseEndpoint<S,?> getEndpoint() { // Over-ridden to add cast - return (AbstractJsseEndpoint<S>) super.getEndpoint(); + return (AbstractJsseEndpoint<S,?>) super.getEndpoint(); } Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java Wed Dec 7 09:28:40 2016 @@ -47,7 +47,7 @@ public abstract class AbstractHttp11Prot StringManager.getManager(AbstractHttp11Protocol.class); - public AbstractHttp11Protocol(AbstractEndpoint<S> endpoint) { + public AbstractHttp11Protocol(AbstractEndpoint<S,?> endpoint) { super(endpoint); setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); ConnectionHandler<S> cHandler = new ConnectionHandler<>(this); @@ -81,7 +81,7 @@ public abstract class AbstractHttp11Prot * Over-ridden here to make the method visible to nested classes. */ @Override - protected AbstractEndpoint<S> getEndpoint() { + protected AbstractEndpoint<S,?> getEndpoint() { return super.getEndpoint(); } Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/Http11Processor.java Wed Dec 7 09:28:40 2016 @@ -221,7 +221,7 @@ public class Http11Processor extends Abs private final Map<String,UpgradeProtocol> httpUpgradeProtocols; - public Http11Processor(int maxHttpHeaderSize, AbstractEndpoint<?> endpoint,int maxTrailerSize, + public Http11Processor(int maxHttpHeaderSize, AbstractEndpoint<?,?> endpoint,int maxTrailerSize, Set<String> allowedTrailerHeaders, int maxExtensionSize, int maxSwallowSize, Map<String,UpgradeProtocol> httpUpgradeProtocols) { Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Wed Dec 7 09:28:40 2016 @@ -36,7 +36,7 @@ import org.apache.juli.logging.Log; import org.apache.tomcat.util.ExceptionUtils; import org.apache.tomcat.util.IntrospectionUtils; import org.apache.tomcat.util.collections.SynchronizedStack; -import org.apache.tomcat.util.net.AbstractEndpoint.Acceptor.AcceptorState; +import org.apache.tomcat.util.net.Acceptor.AcceptorState; import org.apache.tomcat.util.res.StringManager; import org.apache.tomcat.util.threads.LimitLatch; import org.apache.tomcat.util.threads.ResizableExecutor; @@ -45,12 +45,15 @@ import org.apache.tomcat.util.threads.Ta import org.apache.tomcat.util.threads.ThreadPoolExecutor; /** - * @param <S> The type for the sockets managed by this endpoint. + * @param <S> The type used by the socket wrapper associated with this endpoint. + * May be the same as <U>. + * @param <U> The type of the underlying socket used by this endpoint. May be + * the same as <S>. * * @author Mladen Turk * @author Remy Maucherat */ -public abstract class AbstractEndpoint<S> { +public abstract class AbstractEndpoint<S,U> { // -------------------------------------------------------------- Constants @@ -123,29 +126,6 @@ public abstract class AbstractEndpoint<S UNBOUND, BOUND_ON_INIT, BOUND_ON_START } - public abstract static class Acceptor implements Runnable { - public enum AcceptorState { - NEW, RUNNING, PAUSED, ENDED - } - - protected volatile AcceptorState state = AcceptorState.NEW; - public final AcceptorState getState() { - return state; - } - - private String threadName; - protected final void setThreadName(final String threadName) { - this.threadName = threadName; - } - protected final String getThreadName() { - return threadName; - } - } - - - private static final int INITIAL_ERROR_DELAY = 50; - private static final int MAX_ERROR_DELAY = 1600; - // ----------------------------------------------------------------- Fields @@ -182,7 +162,7 @@ public abstract class AbstractEndpoint<S /** * Threads used to accept new connections and pass them to worker threads. */ - protected Acceptor[] acceptors; + protected List<Acceptor<U>> acceptors; /** * Cache for SocketProcessor objects @@ -780,7 +760,7 @@ public abstract class AbstractEndpoint<S protected void unlockAccept() { // Only try to unlock the acceptor if it is necessary boolean unlockRequired = false; - for (Acceptor acceptor : acceptors) { + for (Acceptor<U> acceptor : acceptors) { if (acceptor.getState() == AcceptorState.RUNNING) { unlockRequired = true; break; @@ -856,7 +836,7 @@ public abstract class AbstractEndpoint<S // Wait for upto 1000ms acceptor threads to unlock long waitLeft = 1000; - for (Acceptor acceptor : acceptors) { + for (Acceptor<U> acceptor : acceptors) { while (waitLeft > 0 && acceptor.getState() == AcceptorState.RUNNING) { Thread.sleep(50); @@ -954,13 +934,14 @@ public abstract class AbstractEndpoint<S protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); - acceptors = new Acceptor[count]; + acceptors = new ArrayList<>(count); for (int i = 0; i < count; i++) { - acceptors[i] = createAcceptor(); + Acceptor<U> acceptor = createAcceptor(); String threadName = getName() + "-Acceptor-" + i; - acceptors[i].setThreadName(threadName); - Thread t = new Thread(acceptors[i], threadName); + acceptor.setThreadName(threadName); + acceptors.add(acceptor); + Thread t = new Thread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); @@ -972,7 +953,7 @@ public abstract class AbstractEndpoint<S * Hook to allow Endpoints to provide a specific Acceptor implementation. * @return the acceptor */ - protected abstract Acceptor createAcceptor(); + protected abstract Acceptor<U> createAcceptor(); /** @@ -1045,35 +1026,14 @@ public abstract class AbstractEndpoint<S } else return -1; } - /** - * Provides a common approach for sub-classes to handle exceptions where a - * delay is required to prevent a Thread from entering a tight loop which - * will consume CPU and may also trigger large amounts of logging. For - * example, this can happen with the Acceptor thread if the ulimit for open - * files is reached. - * - * @param currentErrorDelay The current delay being applied on failure - * @return The delay to apply on the next failure - */ - protected int handleExceptionWithDelay(int currentErrorDelay) { - // Don't delay on first exception - if (currentErrorDelay > 0) { - try { - Thread.sleep(currentErrorDelay); - } catch (InterruptedException e) { - // Ignore - } - } + protected abstract U serverSocketAccept() throws Exception; - // On subsequent exceptions, start the delay at 50ms, doubling the delay - // on every subsequent exception until the delay reaches 1.6 seconds. - if (currentErrorDelay == 0) { - return INITIAL_ERROR_DELAY; - } else if (currentErrorDelay < MAX_ERROR_DELAY) { - return currentErrorDelay * 2; - } else { - return MAX_ERROR_DELAY; - } + protected abstract boolean setSocketOptions(U socket); + + protected abstract void closeSocket(U socket); + + protected void destroySocket(U socket) { + closeSocket(socket); } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractJsseEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractJsseEndpoint.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractJsseEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractJsseEndpoint.java Wed Dec 7 09:28:40 2016 @@ -34,7 +34,7 @@ import org.apache.tomcat.util.net.SSLHos import org.apache.tomcat.util.net.openssl.OpenSSLImplementation; import org.apache.tomcat.util.net.openssl.ciphers.Cipher; -public abstract class AbstractJsseEndpoint<S> extends AbstractEndpoint<S> { +public abstract class AbstractJsseEndpoint<S,U> extends AbstractEndpoint<S,U> { private String sslImplementationName = null; private int sniParseLimit = 64 * 1024; Added: tomcat/trunk/java/org/apache/tomcat/util/net/Acceptor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Acceptor.java?rev=1773036&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Acceptor.java (added) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Acceptor.java Wed Dec 7 09:28:40 2016 @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.net; + +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.jni.Error; +import org.apache.tomcat.util.ExceptionUtils; +import org.apache.tomcat.util.res.StringManager; + +public class Acceptor<U> implements Runnable { + + private static final Log log = LogFactory.getLog(Acceptor.class); + private static final StringManager sm = StringManager.getManager(Acceptor.class); + + private static final int INITIAL_ERROR_DELAY = 50; + private static final int MAX_ERROR_DELAY = 1600; + + private final AbstractEndpoint<?,U> endpoint; + private String threadName; + protected volatile AcceptorState state = AcceptorState.NEW; + + + public Acceptor(AbstractEndpoint<?,U> endpoint) { + this.endpoint = endpoint; + } + + + public final AcceptorState getState() { + return state; + } + + + final void setThreadName(final String threadName) { + this.threadName = threadName; + } + + + final String getThreadName() { + return threadName; + } + + + @Override + public void run() { + + int errorDelay = 0; + + // Loop until we receive a shutdown command + while (endpoint.isRunning()) { + + // Loop if endpoint is paused + while (endpoint.isPaused() && endpoint.isRunning()) { + state = AcceptorState.PAUSED; + try { + Thread.sleep(50); + } catch (InterruptedException e) { + // Ignore + } + } + + if (!endpoint.isRunning()) { + break; + } + state = AcceptorState.RUNNING; + + try { + //if we have reached max connections, wait + endpoint.countUpOrAwaitConnection(); + + U socket = null; + try { + // Accept the next incoming connection from the server + // socket + socket = endpoint.serverSocketAccept(); + } catch (Exception ioe) { + // We didn't get a socket + endpoint.countDownConnection(); + if (endpoint.isRunning()) { + // Introduce delay if necessary + errorDelay = handleExceptionWithDelay(errorDelay); + // re-throw + throw ioe; + } else { + break; + } + } + // Successful accept, reset the error delay + errorDelay = 0; + + // Configure the socket + if (endpoint.isRunning() && !endpoint.isPaused()) { + // setSocketOptions() will hand the socket off to + // an appropriate processor if successful + if (!endpoint.setSocketOptions(socket)) { + endpoint.closeSocket(socket); + } + } else { + endpoint.destroySocket(socket); + } + } catch (Throwable t) { + ExceptionUtils.handleThrowable(t); + String msg = sm.getString("endpoint.accept.fail"); + // APR specific. + // Could push this down but not sure it is worth the trouble. + if (t instanceof Error) { + Error e = (Error) t; + if (e.getError() == 233) { + // Not an error on HP-UX so log as a warning + // so it can be filtered out on that platform + // See bug 50273 + log.warn(msg, t); + } else { + log.error(msg, t); + } + } else { + log.error(msg, t); + } + } + } + state = AcceptorState.ENDED; + } + + + /** + * Handles exceptions where a delay is required to prevent a Thread from + * entering a tight loop which will consume CPU and may also trigger large + * amounts of logging. For example, this can happen if the ulimit for open + * files is reached. + * + * @param currentErrorDelay The current delay being applied on failure + * @return The delay to apply on the next failure + */ + private int handleExceptionWithDelay(int currentErrorDelay) { + // Don't delay on first exception + if (currentErrorDelay > 0) { + try { + Thread.sleep(currentErrorDelay); + } catch (InterruptedException e) { + // Ignore + } + } + + // On subsequent exceptions, start the delay at 50ms, doubling the delay + // on every subsequent exception until the delay reaches 1.6 seconds. + if (currentErrorDelay == 0) { + return INITIAL_ERROR_DELAY; + } else if (currentErrorDelay < MAX_ERROR_DELAY) { + return currentErrorDelay * 2; + } else { + return MAX_ERROR_DELAY; + } + } + + + public enum AcceptorState { + NEW, RUNNING, PAUSED, ENDED + } +} Propchange: tomcat/trunk/java/org/apache/tomcat/util/net/Acceptor.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Wed Dec 7 09:28:40 2016 @@ -52,8 +52,8 @@ import org.apache.tomcat.jni.Status; import org.apache.tomcat.util.ExceptionUtils; import org.apache.tomcat.util.buf.ByteBufferUtils; import org.apache.tomcat.util.collections.SynchronizedStack; -import org.apache.tomcat.util.net.AbstractEndpoint.Acceptor.AcceptorState; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; +import org.apache.tomcat.util.net.Acceptor.AcceptorState; import org.apache.tomcat.util.net.SSLHostConfig.Type; import org.apache.tomcat.util.net.openssl.OpenSSLEngine; @@ -73,7 +73,7 @@ import org.apache.tomcat.util.net.openss * @author Mladen Turk * @author Remy Maucherat */ -public class AprEndpoint extends AbstractEndpoint<Long> implements SNICallBack { +public class AprEndpoint extends AbstractEndpoint<Long,Long> implements SNICallBack { // -------------------------------------------------------------- Constants @@ -636,7 +636,7 @@ public class AprEndpoint extends Abstrac // Ignore } } - for (AbstractEndpoint.Acceptor acceptor : acceptors) { + for (Acceptor<Long> acceptor : acceptors) { long waitLeft = 10000; while (waitLeft > 0 && acceptor.getState() != AcceptorState.ENDED && @@ -723,8 +723,8 @@ public class AprEndpoint extends Abstrac // ------------------------------------------------------ Protected Methods @Override - protected AbstractEndpoint.Acceptor createAcceptor() { - return new Acceptor(); + protected Acceptor<Long> createAcceptor() { + return new Acceptor<>(this); } @@ -817,20 +817,20 @@ public class AprEndpoint extends Abstrac * and processing may continue, <code>false</code> if the socket needs to be * close immediately */ - protected boolean processSocketWithOptions(long socket) { + @Override + protected boolean setSocketOptions(Long socket) { try { // During shutdown, executor may be null - avoid NPE if (running) { if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.debug.socket", - Long.valueOf(socket))); + log.debug(sm.getString("endpoint.debug.socket", socket)); } - AprSocketWrapper wrapper = new AprSocketWrapper(Long.valueOf(socket), this); + AprSocketWrapper wrapper = new AprSocketWrapper(socket, this); wrapper.setKeepAliveLeft(getMaxKeepAliveRequests()); wrapper.setSecure(isSSLEnabled()); wrapper.setReadTimeout(getConnectionTimeout()); wrapper.setWriteTimeout(getConnectionTimeout()); - connections.put(Long.valueOf(socket), wrapper); + connections.put(socket, wrapper); getExecutor().execute(new SocketWithOptionsProcessor(wrapper)); } } catch (RejectedExecutionException x) { @@ -847,6 +847,20 @@ public class AprEndpoint extends Abstrac } + @Override + protected Long serverSocketAccept() throws Exception { + long socket = Socket.accept(serverSock); + if (log.isDebugEnabled()) { + long sa = Address.get(Socket.APR_REMOTE, socket); + Sockaddr addr = Address.getInfo(sa); + log.debug(sm.getString("endpoint.apr.remoteport", + Long.valueOf(socket), + Long.valueOf(addr.port))); + } + return Long.valueOf(socket); + } + + /** * Process the given socket. Typically keep alive or upgraded protocol. * @@ -870,6 +884,12 @@ public class AprEndpoint extends Abstrac } + @Override + protected void closeSocket(Long socket) { + closeSocket(socket.longValue()); + } + + private void closeSocket(long socket) { // Once this is called, the mapping from socket to wrapper will no // longer be required. @@ -885,6 +905,17 @@ public class AprEndpoint extends Abstrac * is currently being used by the Poller. It is generally a bad idea to call * this directly from a known error condition. */ + @Override + protected void destroySocket(Long socket) { + destroySocket(socket.longValue()); + } + + + /* + * This method should only be called if there is no chance that the socket + * is currently being used by the Poller. It is generally a bad idea to call + * this directly from a known error condition. + */ private void destroySocket(long socket) { connections.remove(Long.valueOf(socket)); if (log.isDebugEnabled()) { @@ -911,105 +942,6 @@ public class AprEndpoint extends Abstrac return log; } - // --------------------------------------------------- Acceptor Inner Class - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - protected class Acceptor extends AbstractEndpoint.Acceptor { - - private final Log log = LogFactory.getLog(AprEndpoint.Acceptor.class); - - @Override - public void run() { - - int errorDelay = 0; - - // Loop until we receive a shutdown command - while (running) { - - // Loop if endpoint is paused - while (paused && running) { - state = AcceptorState.PAUSED; - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore - } - } - - if (!running) { - break; - } - state = AcceptorState.RUNNING; - - try { - //if we have reached max connections, wait - countUpOrAwaitConnection(); - - long socket = 0; - try { - // Accept the next incoming connection from the server - // socket - socket = Socket.accept(serverSock); - if (log.isDebugEnabled()) { - long sa = Address.get(Socket.APR_REMOTE, socket); - Sockaddr addr = Address.getInfo(sa); - log.debug(sm.getString("endpoint.apr.remoteport", - Long.valueOf(socket), - Long.valueOf(addr.port))); - } - } catch (Exception e) { - // We didn't get a socket - countDownConnection(); - if (running) { - // Introduce delay if necessary - errorDelay = handleExceptionWithDelay(errorDelay); - // re-throw - throw e; - } else { - break; - } - } - // Successful accept, reset the error delay - errorDelay = 0; - - if (running && !paused) { - // Hand this socket off to an appropriate processor - if (!processSocketWithOptions(socket)) { - // Close socket right away - closeSocket(socket); - } - } else { - // Close socket right away - // No code path could have added the socket to the - // Poller so use destroySocket() - destroySocket(socket); - } - } catch (Throwable t) { - ExceptionUtils.handleThrowable(t); - String msg = sm.getString("endpoint.accept.fail"); - if (t instanceof Error) { - Error e = (Error) t; - if (e.getError() == 233) { - // Not an error on HP-UX so log as a warning - // so it can be filtered out on that platform - // See bug 50273 - log.warn(msg, t); - } else { - log.error(msg, t); - } - } else { - log.error(msg, t); - } - } - // The processor will recycle itself when it finishes - } - state = AcceptorState.ENDED; - } - } - - // -------------------------------------------------- SocketInfo Inner Class public static class SocketInfo { Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Wed Dec 7 09:28:40 2016 @@ -57,7 +57,7 @@ import org.apache.tomcat.util.net.jsse.J /** * NIO2 endpoint. */ -public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel> { +public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousSocketChannel> { // -------------------------------------------------------------- Constants @@ -287,8 +287,8 @@ public class Nio2Endpoint extends Abstra } @Override - protected AbstractEndpoint.Acceptor createAcceptor() { - return new Acceptor(); + protected Acceptor<AsynchronousSocketChannel> createAcceptor() { + return new Acceptor<>(this); } /** @@ -298,6 +298,7 @@ public class Nio2Endpoint extends Abstra * and processing may continue, <code>false</code> if the socket needs to be * close immediately */ + @Override protected boolean setSocketOptions(AsynchronousSocketChannel socket) { try { socketProperties.setProperties(socket); @@ -333,13 +334,44 @@ public class Nio2Endpoint extends Abstra @Override + protected void closeSocket(AsynchronousSocketChannel socket) { + countDownConnection(); + try { + socket.close(); + } catch (IOException ioe) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("endpoint.err.close"), ioe); + } + } + } + + + @Override + protected NetworkChannel getServerSocket() { + return serverSock; + } + + + @Override + protected AsynchronousSocketChannel serverSocketAccept() throws Exception { + return serverSock.accept().get(); + } + + + @Override + protected Log getLog() { + return log; + } + + + @Override protected SocketProcessorBase<Nio2Channel> createSocketProcessor( SocketWrapperBase<Nio2Channel> socketWrapper, SocketEvent event) { return new SocketProcessor(socketWrapper, event); } - public void closeSocket(SocketWrapperBase<Nio2Channel> socket) { + private void closeSocket(SocketWrapperBase<Nio2Channel> socket) { if (log.isDebugEnabled()) { log.debug("Calling [" + this + "].closeSocket([" + socket + "],[" + socket.getSocket() + "])", new Exception()); @@ -377,105 +409,6 @@ public class Nio2Endpoint extends Abstra } } - @Override - protected Log getLog() { - return log; - } - - - @Override - protected NetworkChannel getServerSocket() { - return serverSock; - } - - - // --------------------------------------------------- Acceptor Inner Class - /** - * With NIO2, the main acceptor thread only initiates the initial accept - * but periodically checks that the connector is still accepting (if not - * it will attempt to start again). - */ - protected class Acceptor extends AbstractEndpoint.Acceptor { - - @Override - public void run() { - - int errorDelay = 0; - - // Loop until we receive a shutdown command - while (running) { - - // Loop if endpoint is paused - while (paused && running) { - state = AcceptorState.PAUSED; - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore - } - } - - if (!running) { - break; - } - state = AcceptorState.RUNNING; - - try { - //if we have reached max connections, wait - countUpOrAwaitConnection(); - - AsynchronousSocketChannel socket = null; - try { - // Accept the next incoming connection from the server - // socket - socket = serverSock.accept().get(); - } catch (Exception e) { - // We didn't get a socket - countDownConnection(); - if (running) { - // Introduce delay if necessary - errorDelay = handleExceptionWithDelay(errorDelay); - // re-throw - throw e; - } else { - break; - } - } - // Successful accept, reset the error delay - errorDelay = 0; - - // Configure the socket - if (running && !paused) { - // setSocketOptions() will hand the socket off to - // an appropriate processor if successful - if (!setSocketOptions(socket)) { - closeSocket(socket); - } - } else { - closeSocket(socket); - } - } catch (Throwable t) { - ExceptionUtils.handleThrowable(t); - log.error(sm.getString("endpoint.accept.fail"), t); - } - } - state = AcceptorState.ENDED; - } - - - private void closeSocket(AsynchronousSocketChannel socket) { - countDownConnection(); - try { - socket.close(); - } catch (IOException ioe) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.err.close"), ioe); - } - } - } - } - - public static class Nio2SocketWrapper extends SocketWrapperBase<Nio2Channel> { private static final ThreadLocal<AtomicInteger> nestedWriteCompletionCount = Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed Dec 7 09:28:40 2016 @@ -66,7 +66,7 @@ import org.apache.tomcat.util.net.jsse.J * @author Mladen Turk * @author Remy Maucherat */ -public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> { +public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> { // -------------------------------------------------------------- Constants @@ -340,8 +340,8 @@ public class NioEndpoint extends Abstrac @Override - protected AbstractEndpoint.Acceptor createAcceptor() { - return new Acceptor(); + protected Acceptor<SocketChannel> createAcceptor() { + return new Acceptor<>(this); } @@ -352,6 +352,7 @@ public class NioEndpoint extends Abstrac * and processing may continue, <code>false</code> if the socket needs to be * close immediately */ + @Override protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { @@ -391,8 +392,22 @@ public class NioEndpoint extends Abstrac @Override - protected Log getLog() { - return log; + protected void closeSocket(SocketChannel socket) { + countDownConnection(); + try { + socket.socket().close(); + } catch (IOException ioe) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("endpoint.err.close"), ioe); + } + } + try { + socket.close(); + } catch (IOException ioe) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("endpoint.err.close"), ioe); + } + } } @@ -402,96 +417,15 @@ public class NioEndpoint extends Abstrac } - // --------------------------------------------------- Acceptor Inner Class - /** - * The background thread that listens for incoming TCP/IP connections and - * hands them off to an appropriate processor. - */ - protected class Acceptor extends AbstractEndpoint.Acceptor { - - @Override - public void run() { - - int errorDelay = 0; - - // Loop until we receive a shutdown command - while (running) { - - // Loop if endpoint is paused - while (paused && running) { - state = AcceptorState.PAUSED; - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore - } - } - - if (!running) { - break; - } - state = AcceptorState.RUNNING; - - try { - //if we have reached max connections, wait - countUpOrAwaitConnection(); - - SocketChannel socket = null; - try { - // Accept the next incoming connection from the server - // socket - socket = serverSock.accept(); - } catch (IOException ioe) { - // We didn't get a socket - countDownConnection(); - if (running) { - // Introduce delay if necessary - errorDelay = handleExceptionWithDelay(errorDelay); - // re-throw - throw ioe; - } else { - break; - } - } - // Successful accept, reset the error delay - errorDelay = 0; - - // Configure the socket - if (running && !paused) { - // setSocketOptions() will hand the socket off to - // an appropriate processor if successful - if (!setSocketOptions(socket)) { - closeSocket(socket); - } - } else { - closeSocket(socket); - } - } catch (Throwable t) { - ExceptionUtils.handleThrowable(t); - log.error(sm.getString("endpoint.accept.fail"), t); - } - } - state = AcceptorState.ENDED; - } + @Override + protected SocketChannel serverSocketAccept() throws Exception { + return serverSock.accept(); + } - private void closeSocket(SocketChannel socket) { - countDownConnection(); - try { - socket.socket().close(); - } catch (IOException ioe) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.err.close"), ioe); - } - } - try { - socket.close(); - } catch (IOException ioe) { - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.err.close"), ioe); - } - } - } + @Override + protected Log getLog() { + return log; } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Wed Dec 7 09:28:40 2016 @@ -38,7 +38,7 @@ public abstract class SocketWrapperBase< protected static final StringManager sm = StringManager.getManager(SocketWrapperBase.class); private final E socket; - private final AbstractEndpoint<E> endpoint; + private final AbstractEndpoint<E,?> endpoint; // Volatile because I/O and setting the timeout values occurs on a different // thread to the thread checking the timeout. @@ -90,7 +90,7 @@ public abstract class SocketWrapperBase< */ protected int bufferedWriteSize = 64 * 1024; // 64k default write buffer - public SocketWrapperBase(E socket, AbstractEndpoint<E> endpoint) { + public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) { this.socket = socket; this.endpoint = endpoint; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -102,7 +102,7 @@ public abstract class SocketWrapperBase< return socket; } - public AbstractEndpoint<E> getEndpoint() { + public AbstractEndpoint<E,?> getEndpoint() { return endpoint; } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java Wed Dec 7 09:28:40 2016 @@ -222,7 +222,7 @@ public class WsRemoteEndpointImplServer if (sh != null) { if (useDispatch) { OnResultRunnable r = new OnResultRunnable(sh, t); - AbstractEndpoint<?> endpoint = socketWrapper.getEndpoint(); + AbstractEndpoint<?,?> endpoint = socketWrapper.getEndpoint(); Executor containerExecutor = endpoint.getExecutor(); if (endpoint.isRunning() && containerExecutor != null) { containerExecutor.execute(r); Modified: tomcat/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1773036&r1=1773035&r2=1773036&view=diff ============================================================================== --- tomcat/trunk/webapps/docs/changelog.xml (original) +++ tomcat/trunk/webapps/docs/changelog.xml Wed Dec 7 09:28:40 2016 @@ -52,6 +52,10 @@ with a JSSE connector and an explicit alias has not been configured. (markt) </fix> + <scode> + Extract the common Acceptor code from each Endpoint into a new Acceptor + class that is used by all Endpoints. (markt) + </scode> </changelog> </subsection> </section> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org