Author: markt Date: Fri Jan 10 11:43:02 2014 New Revision: 1557092 URL: http://svn.apache.org/r1557092 Log: Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=55978 Ensure that container makes first call on onWritePossible when using non-blocking IO with an HTTP upgraded connection
Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioServletOutputStream.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1557092&r1=1557091&r2=1557092&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Fri Jan 10 11:43:02 2014 @@ -628,8 +628,13 @@ public abstract class AbstractProtocol<S // these calls may result in a nested call to process() connections.put(socket, processor); DispatchType nextDispatch = dispatches.next(); - state = processor.asyncDispatch( - nextDispatch.getSocketStatus()); + if (processor.isUpgrade()) { + state = processor.upgradeDispatch( + nextDispatch.getSocketStatus()); + } else { + state = processor.asyncDispatch( + nextDispatch.getSocketStatus()); + } } else if (status == SocketStatus.DISCONNECT && !processor.isComet()) { // Do nothing here, just wait for it to get recycled Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java?rev=1557092&r1=1557091&r2=1557092&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java Fri Jan 10 11:43:02 2014 @@ -42,11 +42,11 @@ public abstract class AbstractProcessor< private final HttpUpgradeHandler httpUpgradeHandler; private final AbstractServletInputStream upgradeServletInputStream; - private final AbstractServletOutputStream upgradeServletOutputStream; + private final AbstractServletOutputStream<S> upgradeServletOutputStream; protected AbstractProcessor (HttpUpgradeHandler httpUpgradeHandler, AbstractServletInputStream upgradeServletInputStream, - AbstractServletOutputStream upgradeServletOutputStream) { + AbstractServletOutputStream<S> upgradeServletOutputStream) { this.httpUpgradeHandler = httpUpgradeHandler; this.upgradeServletInputStream = upgradeServletInputStream; this.upgradeServletOutputStream = upgradeServletOutputStream; Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java?rev=1557092&r1=1557091&r2=1557092&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java Fri Jan 10 11:43:02 2014 @@ -22,13 +22,17 @@ import javax.servlet.ServletOutputStream import javax.servlet.WriteListener; import org.apache.tomcat.util.ExceptionUtils; +import org.apache.tomcat.util.net.DispatchType; +import org.apache.tomcat.util.net.SocketWrapper; import org.apache.tomcat.util.res.StringManager; -public abstract class AbstractServletOutputStream extends ServletOutputStream { +public abstract class AbstractServletOutputStream<S> extends ServletOutputStream { protected static final StringManager sm = StringManager.getManager(Constants.Package); + protected final SocketWrapper<S> socketWrapper; + private final Object fireListenerLock = new Object(); private final Object writeLock = new Object(); @@ -39,6 +43,12 @@ public abstract class AbstractServletOut private volatile ClassLoader applicationLoader = null; private volatile byte[] buffer; + + public AbstractServletOutputStream(SocketWrapper<S> socketWrapper) { + this.socketWrapper = socketWrapper; + } + + @Override public final boolean isReady() { if (listener == null) { @@ -55,6 +65,7 @@ public abstract class AbstractServletOut } } + @Override public final void setWriteListener(WriteListener listener) { if (listener == null) { @@ -65,14 +76,23 @@ public abstract class AbstractServletOut throw new IllegalArgumentException( sm.getString("upgrade.sos.writeListener.set")); } + // Container is responsible for first call to onWritePossible() but only + // need to do this if setting the listener for the first time rather + // than changing it. + synchronized (fireListenerLock) { + fireListener = true; + } + socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE); this.listener = listener; this.applicationLoader = Thread.currentThread().getContextClassLoader(); } + protected final boolean isCloseRequired() { return closeRequired; } + @Override public void write(int b) throws IOException { synchronized (writeLock) { @@ -97,6 +117,7 @@ public abstract class AbstractServletOut doClose(); } + private void preWriteChecks() { if (buffer != null) { throw new IllegalStateException( @@ -135,7 +156,9 @@ public abstract class AbstractServletOut protected final void onWritePossible() throws IOException { synchronized (writeLock) { try { - writeInternal(buffer, 0, buffer.length); + if (buffer != null) { + writeInternal(buffer, 0, buffer.length); + } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); Thread thread = Thread.currentThread(); @@ -153,8 +176,9 @@ public abstract class AbstractServletOut } } - // Make sure isReady() and onWritePossible() have a consistent view of - // buffer and fireListener when determining if the listener should fire + // Make sure isReady() and onWritePossible() have a consistent view + // of buffer and fireListener when determining if the listener + // should fire boolean fire = false; synchronized (fireListenerLock) { @@ -176,6 +200,7 @@ public abstract class AbstractServletOut } } + /** * Abstract method to be overridden by concrete implementations. The base * class will ensure that there are no concurrent calls to this method for Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java?rev=1557092&r1=1557091&r2=1557092&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java Fri Jan 10 11:43:02 2014 @@ -28,21 +28,20 @@ import org.apache.tomcat.jni.Status; import org.apache.tomcat.util.net.AprEndpoint; import org.apache.tomcat.util.net.SocketWrapper; -public class AprServletOutputStream extends AbstractServletOutputStream { +public class AprServletOutputStream extends AbstractServletOutputStream<Long> { private static final int SSL_OUTPUT_BUFFER_SIZE = 8192; private final AprEndpoint endpoint; - private final SocketWrapper<Long> wrapper; private final long socket; private volatile boolean closed = false; private final ByteBuffer sslOutputBuffer; - public AprServletOutputStream(SocketWrapper<Long> wrapper, + public AprServletOutputStream(SocketWrapper<Long> socketWrapper, AprEndpoint endpoint) { + super(socketWrapper); this.endpoint = endpoint; - this.wrapper = wrapper; - this.socket = wrapper.getSocket().longValue(); + this.socket = socketWrapper.getSocket().longValue(); if (endpoint.isSSLEnabled()) { sslOutputBuffer = ByteBuffer.allocateDirect(SSL_OUTPUT_BUFFER_SIZE); sslOutputBuffer.position(SSL_OUTPUT_BUFFER_SIZE); @@ -60,12 +59,12 @@ public class AprServletOutputStream exte throw new IOException(sm.getString("apr.closed", Long.valueOf(socket))); } - Lock readLock = wrapper.getBlockingStatusReadLock(); - WriteLock writeLock = wrapper.getBlockingStatusWriteLock(); + Lock readLock = socketWrapper.getBlockingStatusReadLock(); + WriteLock writeLock = socketWrapper.getBlockingStatusWriteLock(); try { readLock.lock(); - if (wrapper.getBlockingStatus() == block) { + if (socketWrapper.getBlockingStatus() == block) { return doWriteInternal(b, off, len); } } finally { @@ -75,7 +74,7 @@ public class AprServletOutputStream exte try { writeLock.lock(); // Set the current settings for this socket - wrapper.setBlockingStatus(block); + socketWrapper.setBlockingStatus(block); if (block) { Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000); } else { @@ -141,7 +140,7 @@ public class AprServletOutputStream exte throw new EOFException(sm.getString("apr.clientAbort")); } else if (written < 0) { throw new IOException(sm.getString("apr.write.error", - Integer.valueOf(-written), Long.valueOf(socket), wrapper)); + Integer.valueOf(-written), Long.valueOf(socket), socketWrapper)); } start += written; left -= written; Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioServletOutputStream.java?rev=1557092&r1=1557091&r2=1557092&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioServletOutputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioServletOutputStream.java Fri Jan 10 11:43:02 2014 @@ -22,13 +22,14 @@ import java.net.Socket; import org.apache.tomcat.util.net.SocketWrapper; -public class BioServletOutputStream extends AbstractServletOutputStream { +public class BioServletOutputStream extends AbstractServletOutputStream<Socket> { private final OutputStream os; - public BioServletOutputStream(SocketWrapper<Socket> wrapper) + public BioServletOutputStream(SocketWrapper<Socket> socketWrapper) throws IOException { - os = wrapper.getSocket().getOutputStream(); + super(socketWrapper); + os = socketWrapper.getSocket().getOutputStream(); } @Override Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java?rev=1557092&r1=1557091&r2=1557092&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java Fri Jan 10 11:43:02 2014 @@ -25,7 +25,7 @@ import org.apache.tomcat.util.net.NioEnd import org.apache.tomcat.util.net.NioSelectorPool; import org.apache.tomcat.util.net.SocketWrapper; -public class NioServletOutputStream extends AbstractServletOutputStream { +public class NioServletOutputStream extends AbstractServletOutputStream<NioChannel> { private final NioChannel channel; private final NioSelectorPool pool; @@ -33,8 +33,9 @@ public class NioServletOutputStream exte public NioServletOutputStream( - SocketWrapper<NioChannel> wrapper, NioSelectorPool pool) { - channel = wrapper.getSocket(); + SocketWrapper<NioChannel> socketWrapper, NioSelectorPool pool) { + super(socketWrapper); + channel = socketWrapper.getSocket(); this.pool = pool; maxWrite = channel.getBufHandler().getWriteBuffer().capacity(); } Modified: tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java?rev=1557092&r1=1557091&r2=1557092&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java (original) +++ tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java Fri Jan 10 11:43:02 2014 @@ -93,6 +93,11 @@ public class TestUpgrade extends TomcatB doTestCheckClosed(SetWriteListenerTwice.class); } + @Test + public void testFirstCallToOnWritePossible() throws Exception { + doTestFixedResponse(FixedResponseNonBlocking.class); + } + private void doTestCheckClosed( Class<? extends HttpUpgradeHandler> upgradeHandlerClass) throws Exception { @@ -104,6 +109,17 @@ public class TestUpgrade extends TomcatB Assert.assertEquals(-1, c); } + private void doTestFixedResponse( + Class<? extends HttpUpgradeHandler> upgradeHandlerClass) + throws Exception { + UpgradeConnection conn = doUpgrade(upgradeHandlerClass); + + Reader r = conn.getReader(); + int c = r.read(); + + Assert.assertEquals(FixedResponseNonBlocking.FIXED_RESPONSE, c); + } + private void doTestMessages ( Class<? extends HttpUpgradeHandler> upgradeHandlerClass) throws Exception { @@ -374,6 +390,46 @@ public class TestUpgrade extends TomcatB } + public static class FixedResponseNonBlocking implements HttpUpgradeHandler { + + public static final char FIXED_RESPONSE = 'F'; + + private ServletInputStream sis; + private ServletOutputStream sos; + + @Override + public void init(WebConnection connection) { + + try { + sis = connection.getInputStream(); + sos = connection.getOutputStream(); + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + + sis.setReadListener(new NoOpReadListener()); + sos.setWriteListener(new FixedResponseWriteListener()); + } + + @Override + public void destroy() { + // NO-OP + } + + private class FixedResponseWriteListener extends NoOpWriteListener { + @Override + public void onWritePossible() { + try { + sos.write(FIXED_RESPONSE); + sos.flush(); + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + } + } + + private static class NoOpReadListener implements ReadListener { @Override @@ -392,6 +448,7 @@ public class TestUpgrade extends TomcatB } } + private static class NoOpWriteListener implements WriteListener { @Override --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org