Author: markt Date: Fri Feb 27 15:00:45 2015 New Revision: 1662699 URL: http://svn.apache.org/r1662699 Log: Remove support for concurrent read/write. This breaks blocking text messages
Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java?rev=1662699&r1=1662698&r2=1662699&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Protocol.java Fri Feb 27 15:00:45 2015 @@ -289,7 +289,7 @@ public abstract class AbstractHttp11Prot return new UpgradeProcessorInternal(socket, leftoverInput, (InternalHttpUpgradeHandler) httpUpgradeHandler); } else { - return new UpgradeProcessorExternal(socket, leftoverInput, httpUpgradeHandler); + return new UpgradeProcessorExternal(socket, leftoverInput); } } } Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java?rev=1662699&r1=1662698&r2=1662699&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorExternal.java Fri Feb 27 15:00:45 2015 @@ -21,7 +21,6 @@ import java.nio.ByteBuffer; import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; -import javax.servlet.http.HttpUpgradeHandler; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; @@ -39,8 +38,7 @@ public class UpgradeProcessorExternal ex private final UpgradeServletOutputStream upgradeServletOutputStream; - public UpgradeProcessorExternal(SocketWrapperBase<?> wrapper, ByteBuffer leftOverInput, - HttpUpgradeHandler httpUpgradeHandler) { + public UpgradeProcessorExternal(SocketWrapperBase<?> wrapper, ByteBuffer leftOverInput) { super(wrapper, leftOverInput); this.upgradeServletInputStream = new UpgradeServletInputStream(wrapper); this.upgradeServletOutputStream = new UpgradeServletOutputStream(wrapper); @@ -51,10 +49,6 @@ public class UpgradeProcessorExternal ex */ wrapper.setReadTimeout(INFINITE_TIMEOUT); wrapper.setWriteTimeout(INFINITE_TIMEOUT); - - if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) { - wrapper.setInternalUpgrade(true); - } } Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java?rev=1662699&r1=1662698&r2=1662699&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessorInternal.java Fri Feb 27 15:00:45 2015 @@ -41,7 +41,6 @@ public class UpgradeProcessorInternal ex wrapper.setWriteTimeout(INFINITE_TIMEOUT); internalHttpUpgradeHandler.setSocketWrapper(wrapper); - wrapper.setInternalUpgrade(true); } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1662699&r1=1662698&r2=1662699&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Fri Feb 27 15:00:45 2015 @@ -2326,40 +2326,28 @@ public class AprEndpoint extends Abstrac @Override public void run() { - // Upgraded connections using an internal upgrade handler are - // allowed concurrent read/writes - if (socket.isInternalUpgrade() && SocketStatus.OPEN_WRITE == status) { - synchronized (socket.getWriteThreadLock()) { - doRun(); + synchronized (socket) { + // Process the request from this socket + if (socket.getSocket() == null || !connections.containsKey(socket)) { + // Closed in another thread + return; } - } else { - synchronized (socket) { - doRun(); + SocketState state = handler.process(socket, status); + if (state == Handler.SocketState.CLOSED) { + // Close socket and pool + closeSocket(socket.getSocket().longValue()); + socket.reset(null, 1); + } else if (state == Handler.SocketState.LONG) { + if (socket.isAsync()) { + waitingRequests.add(socket); + } + } else if (state == Handler.SocketState.ASYNC_END) { + SocketProcessor proc = new SocketProcessor(socket, + SocketStatus.OPEN_READ); + getExecutor().execute(proc); } } } - - private void doRun() { - // Process the request from this socket - if (socket.getSocket() == null || !connections.containsKey(socket)) { - // Closed in another thread - return; - } - SocketState state = handler.process(socket, status); - if (state == Handler.SocketState.CLOSED) { - // Close socket and pool - closeSocket(socket.getSocket().longValue()); - socket.reset(null, 1); - } else if (state == Handler.SocketState.LONG) { - if (socket.isAsync()) { - waitingRequests.add(socket); - } - } else if (state == Handler.SocketState.ASYNC_END) { - SocketProcessor proc = new SocketProcessor(socket, - SocketStatus.OPEN_READ); - getExecutor().execute(proc); - } - } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1662699&r1=1662698&r2=1662699&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Fri Feb 27 15:00:45 2015 @@ -1640,112 +1640,100 @@ public class Nio2Endpoint extends Abstra @Override public void run() { - // Upgraded connections using an internal upgrade handler are - // allowed concurrent read/writes - if (socket.isInternalUpgrade() && SocketStatus.OPEN_WRITE == status) { - synchronized (socket.getWriteThreadLock()) { - doRun(); - } - } else { - synchronized (socket) { - doRun(); - } - } - } - - private void doRun() { - boolean launch = false; - try { - int handshake = -1; - + synchronized (socket) { + boolean launch = false; try { - if (socket.getSocket() != null) { - // For STOP there is no point trying to handshake as the - // Poller has been stopped. - if (socket.getSocket().isHandshakeComplete() || - status == SocketStatus.STOP) { - handshake = 0; - } else { - handshake = socket.getSocket().handshake(); - // The handshake process reads/writes from/to the - // socket. status may therefore be OPEN_WRITE once - // the handshake completes. However, the handshake - // happens when the socket is opened so the status - // must always be OPEN_READ after it completes. It - // is OK to always set this as it is only used if - // the handshake completes. - status = SocketStatus.OPEN_READ; + int handshake = -1; + + try { + if (socket.getSocket() != null) { + // For STOP there is no point trying to handshake as the + // Poller has been stopped. + if (socket.getSocket().isHandshakeComplete() || + status == SocketStatus.STOP) { + handshake = 0; + } else { + handshake = socket.getSocket().handshake(); + // The handshake process reads/writes from/to the + // socket. status may therefore be OPEN_WRITE once + // the handshake completes. However, the handshake + // happens when the socket is opened so the status + // must always be OPEN_READ after it completes. It + // is OK to always set this as it is only used if + // the handshake completes. + status = SocketStatus.OPEN_READ; + } + } + } catch (IOException x) { + handshake = -1; + if (log.isDebugEnabled()) { + log.debug(sm.getString("endpoint.err.handshake"), x); } } - } catch (IOException x) { - handshake = -1; - if (log.isDebugEnabled()) { - log.debug(sm.getString("endpoint.err.handshake"), x); - } - } - if (handshake == 0) { - SocketState state = SocketState.OPEN; - // Process the request from this socket - if (status == null) { - state = handler.process(socket, SocketStatus.OPEN_READ); - } else { - state = handler.process(socket, status); - } - if (state == SocketState.CLOSED) { - // Close socket and pool + if (handshake == 0) { + SocketState state = SocketState.OPEN; + // Process the request from this socket + if (status == null) { + state = handler.process(socket, SocketStatus.OPEN_READ); + } else { + state = handler.process(socket, status); + } + if (state == SocketState.CLOSED) { + // Close socket and pool + closeSocket(socket); + if (useCaches && running && !paused) { + nioChannels.push(socket.getSocket()); + socketWrapperCache.push((Nio2SocketWrapper) socket); + } + } else if (state == SocketState.UPGRADING) { + socket.setKeptAlive(true); + launch = true; + } + } else if (handshake == -1 ) { closeSocket(socket); if (useCaches && running && !paused) { nioChannels.push(socket.getSocket()); - socketWrapperCache.push((Nio2SocketWrapper) socket); + socketWrapperCache.push(((Nio2SocketWrapper) socket)); } - } else if (state == SocketState.UPGRADING) { - socket.setKeptAlive(true); - launch = true; - } - } else if (handshake == -1 ) { - closeSocket(socket); - if (useCaches && running && !paused) { - nioChannels.push(socket.getSocket()); - socketWrapperCache.push(((Nio2SocketWrapper) socket)); } - } - } catch (OutOfMemoryError oom) { - try { - oomParachuteData = null; - log.error("", oom); - closeSocket(socket); - releaseCaches(); - } catch (Throwable oomt) { + } catch (OutOfMemoryError oom) { try { - System.err.println(oomParachuteMsg); - oomt.printStackTrace(); - } catch (Throwable letsHopeWeDontGetHere){ - ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); + oomParachuteData = null; + log.error("", oom); + closeSocket(socket); + releaseCaches(); + } catch (Throwable oomt) { + try { + System.err.println(oomParachuteMsg); + oomt.printStackTrace(); + } catch (Throwable letsHopeWeDontGetHere){ + ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); + } } - } - } catch (VirtualMachineError vme) { - ExceptionUtils.handleThrowable(vme); - } catch (Throwable t) { - log.error(sm.getString("endpoint.processing.fail"), t); - if (socket != null) { - closeSocket(socket); - } - } finally { - if (launch) { - try { - getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ)); - } catch (NullPointerException npe) { - if (running) { - log.error(sm.getString("endpoint.launch.fail"), - npe); + } catch (VirtualMachineError vme) { + ExceptionUtils.handleThrowable(vme); + } catch (Throwable t) { + log.error(sm.getString("endpoint.processing.fail"), t); + if (socket != null) { + closeSocket(socket); + } + } finally { + if (launch) { + try { + getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ)); + } catch (NullPointerException npe) { + if (running) { + log.error(sm.getString("endpoint.launch.fail"), + npe); + } } } - } - socket = null; - status = null; - //return to cache - if (useCaches && running && !paused) { - processorCache.push(this); + socket = null; + status = null; + //return to cache + if (useCaches && running && !paused) { + processorCache.push(this); + } } } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1662699&r1=1662698&r2=1662699&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Fri Feb 27 15:00:45 2015 @@ -1689,84 +1689,68 @@ public class NioEndpoint extends Abstrac SelectionKey key = socket.getIOChannel().keyFor( socket.getPoller().getSelector()); - // Upgraded connections using an internal upgrade handler are - // allowed concurrent read/writes - if (ka.isInternalUpgrade() && SocketStatus.OPEN_WRITE == status) { - synchronized (ka.getWriteThreadLock()) { - doRun(key, ka); - } - } else { - synchronized (socket) { - doRun(key, ka); - } - } - } - - private void doRun(SelectionKey key, NioSocketWrapper ka) { - NioChannel socket = ka.getSocket(); - - try { - int handshake = -1; - + synchronized (socket) { try { - if (key != null && socket != null) { - // For STOP there is no point trying to handshake as the - // Poller has been stopped. - if (socket.isHandshakeComplete() || - status == SocketStatus.STOP) { - handshake = 0; + int handshake = -1; + + try { + if (key != null) { + // For STOP there is no point trying to handshake as the + // Poller has been stopped. + if (socket.isHandshakeComplete() || + status == SocketStatus.STOP) { + handshake = 0; + } else { + handshake = socket.handshake( + key.isReadable(), key.isWritable()); + // The handshake process reads/writes from/to the + // socket. status may therefore be OPEN_WRITE once + // the handshake completes. However, the handshake + // happens when the socket is opened so the status + // must always be OPEN_READ after it completes. It + // is OK to always set this as it is only used if + // the handshake completes. + status = SocketStatus.OPEN_READ; + } + } + } catch (IOException x) { + handshake = -1; + if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x); + } catch (CancelledKeyException ckx) { + handshake = -1; + } + if (handshake == 0) { + SocketState state = SocketState.OPEN; + // Process the request from this socket + if (status == null) { + state = handler.process(ka, SocketStatus.OPEN_READ); } else { - handshake = socket.handshake( - key.isReadable(), key.isWritable()); - // The handshake process reads/writes from/to the - // socket. status may therefore be OPEN_WRITE once - // the handshake completes. However, the handshake - // happens when the socket is opened so the status - // must always be OPEN_READ after it completes. It - // is OK to always set this as it is only used if - // the handshake completes. - status = SocketStatus.OPEN_READ; - } - } - } catch (IOException x) { - handshake = -1; - if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x); - } catch (CancelledKeyException ckx) { - handshake = -1; - } - if (handshake == 0) { - SocketState state = SocketState.OPEN; - // Process the request from this socket - if (status == null) { - state = handler.process(ka, SocketStatus.OPEN_READ); - } else { - state = handler.process(ka, status); - } - if (state == SocketState.CLOSED) { - // Close socket and pool - try { - if (socket.getPoller().cancelledKey(key) != null) { - // SocketWrapper (attachment) was removed from the - // key - recycle both. This can only happen once - // per attempted closure so it is used to determine - // whether or not to return socket and ka to - // their respective caches. We do NOT want to do - // this more than once - see BZ 57340. - if (running && !paused) { - nioChannels.push(socket); - } - socket = null; - if (running && !paused) { - keyCache.push(ka); + state = handler.process(ka, status); + } + if (state == SocketState.CLOSED) { + // Close socket and pool + try { + if (socket.getPoller().cancelledKey(key) != null) { + // SocketWrapper (attachment) was removed from the + // key - recycle both. This can only happen once + // per attempted closure so it is used to determine + // whether or not to return socket and ka to + // their respective caches. We do NOT want to do + // this more than once - see BZ 57340. + if (running && !paused) { + nioChannels.push(socket); + } + socket = null; + if (running && !paused) { + keyCache.push(ka); + } } + ka = null; + } catch (Exception x) { + log.error("",x); } - ka = null; - } catch (Exception x) { - log.error("",x); } - } - } else if (handshake == -1 ) { - if (socket != null) { + } else if (handshake == -1 ) { if (key != null) { socket.getPoller().cancelledKey(key); } @@ -1774,47 +1758,47 @@ public class NioEndpoint extends Abstrac nioChannels.push(socket); } socket = null; + if (running && !paused) { + keyCache.push(ka); + } + ka = null; + } else { + ka.getPoller().add(socket,handshake); } - if (running && !paused) { - keyCache.push(ka); - } - ka = null; - } else { - ka.getPoller().add(socket,handshake); - } - } catch (CancelledKeyException cx) { - if (socket != null) { - socket.getPoller().cancelledKey(key); - } - } catch (OutOfMemoryError oom) { - try { - oomParachuteData = null; - log.error("", oom); + } catch (CancelledKeyException cx) { if (socket != null) { socket.getPoller().cancelledKey(key); } - releaseCaches(); - } catch (Throwable oomt) { + } catch (OutOfMemoryError oom) { try { - System.err.println(oomParachuteMsg); - oomt.printStackTrace(); - } catch (Throwable letsHopeWeDontGetHere){ - ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); + oomParachuteData = null; + log.error("", oom); + if (socket != null) { + socket.getPoller().cancelledKey(key); + } + releaseCaches(); + } catch (Throwable oomt) { + try { + System.err.println(oomParachuteMsg); + oomt.printStackTrace(); + } catch (Throwable letsHopeWeDontGetHere){ + ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); + } + } + } catch (VirtualMachineError vme) { + ExceptionUtils.handleThrowable(vme); + } catch (Throwable t) { + log.error("", t); + if (socket != null) { + socket.getPoller().cancelledKey(key); + } + } finally { + socket = null; + status = null; + //return to cache + if (running && !paused) { + processorCache.push(this); } - } - } catch (VirtualMachineError vme) { - ExceptionUtils.handleThrowable(vme); - } catch (Throwable t) { - log.error("", t); - if (socket != null) { - socket.getPoller().cancelledKey(key); - } - } finally { - socket = null; - status = null; - //return to cache - if (running && !paused) { - processorCache.push(this); } } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1662699&r1=1662698&r2=1662699&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Fri Feb 27 15:00:45 2015 @@ -51,7 +51,6 @@ public abstract class SocketWrapperBase< private volatile boolean async = false; private boolean keptAlive = false; private volatile boolean upgraded = false; - private volatile boolean internalUpgrade = false; private boolean secure = false; /* * Following cached for speed / reduced GC @@ -153,10 +152,6 @@ public abstract class SocketWrapperBase< } public boolean isUpgraded() { return upgraded; } public void setUpgraded(boolean upgraded) { this.upgraded = upgraded; } - public boolean isInternalUpgrade() {return internalUpgrade; } - public void setInternalUpgrade(boolean internalUpgrade) { - this.internalUpgrade = internalUpgrade; - } public boolean isSecure() { return secure; } public void setSecure(boolean secure) { this.secure = secure; } @@ -351,7 +346,6 @@ public abstract class SocketWrapperBase< this.readTimeout = soTimeout; this.writeTimeout = soTimeout; upgraded = false; - internalUpgrade = false; resetSocketBufferHandler(socket); } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org