Author: markt
Date: Fri Jan 10 11:43:02 2014
New Revision: 1557092

URL: http://svn.apache.org/r1557092
Log:
Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=55978
Ensure that container makes first call on onWritePossible when using 
non-blocking IO with an HTTP upgraded connection

Modified:
    tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java
    
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java
    
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java
    
tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioServletOutputStream.java
    
tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java
    tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java

Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1557092&r1=1557091&r2=1557092&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Fri Jan 10 
11:43:02 2014
@@ -628,8 +628,13 @@ public abstract class AbstractProtocol<S
                         // these calls may result in a nested call to process()
                         connections.put(socket, processor);
                         DispatchType nextDispatch = dispatches.next();
-                        state = processor.asyncDispatch(
-                                nextDispatch.getSocketStatus());
+                        if (processor.isUpgrade()) {
+                            state = processor.upgradeDispatch(
+                                    nextDispatch.getSocketStatus());
+                        } else {
+                            state = processor.asyncDispatch(
+                                    nextDispatch.getSocketStatus());
+                        }
                     } else if (status == SocketStatus.DISCONNECT &&
                             !processor.isComet()) {
                         // Do nothing here, just wait for it to get recycled

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java?rev=1557092&r1=1557091&r2=1557092&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractProcessor.java 
Fri Jan 10 11:43:02 2014
@@ -42,11 +42,11 @@ public abstract class AbstractProcessor<
 
     private final HttpUpgradeHandler httpUpgradeHandler;
     private final AbstractServletInputStream upgradeServletInputStream;
-    private final AbstractServletOutputStream upgradeServletOutputStream;
+    private final AbstractServletOutputStream<S> upgradeServletOutputStream;
 
     protected AbstractProcessor (HttpUpgradeHandler httpUpgradeHandler,
             AbstractServletInputStream upgradeServletInputStream,
-            AbstractServletOutputStream upgradeServletOutputStream) {
+            AbstractServletOutputStream<S> upgradeServletOutputStream) {
         this.httpUpgradeHandler = httpUpgradeHandler;
         this.upgradeServletInputStream = upgradeServletInputStream;
         this.upgradeServletOutputStream = upgradeServletOutputStream;

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java?rev=1557092&r1=1557091&r2=1557092&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java
 (original)
+++ 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AbstractServletOutputStream.java
 Fri Jan 10 11:43:02 2014
@@ -22,13 +22,17 @@ import javax.servlet.ServletOutputStream
 import javax.servlet.WriteListener;
 
 import org.apache.tomcat.util.ExceptionUtils;
+import org.apache.tomcat.util.net.DispatchType;
+import org.apache.tomcat.util.net.SocketWrapper;
 import org.apache.tomcat.util.res.StringManager;
 
-public abstract class AbstractServletOutputStream extends ServletOutputStream {
+public abstract class AbstractServletOutputStream<S> extends 
ServletOutputStream {
 
     protected static final StringManager sm =
             StringManager.getManager(Constants.Package);
 
+    protected final SocketWrapper<S> socketWrapper;
+
     private final Object fireListenerLock = new Object();
     private final Object writeLock = new Object();
 
@@ -39,6 +43,12 @@ public abstract class AbstractServletOut
     private volatile ClassLoader applicationLoader = null;
     private volatile byte[] buffer;
 
+
+    public AbstractServletOutputStream(SocketWrapper<S> socketWrapper) {
+        this.socketWrapper = socketWrapper;
+    }
+
+
     @Override
     public final boolean isReady() {
         if (listener == null) {
@@ -55,6 +65,7 @@ public abstract class AbstractServletOut
         }
     }
 
+
     @Override
     public final void setWriteListener(WriteListener listener) {
         if (listener == null) {
@@ -65,14 +76,23 @@ public abstract class AbstractServletOut
             throw new IllegalArgumentException(
                     sm.getString("upgrade.sos.writeListener.set"));
         }
+        // Container is responsible for first call to onWritePossible() but 
only
+        // need to do this if setting the listener for the first time rather
+        // than changing it.
+        synchronized (fireListenerLock) {
+            fireListener = true;
+        }
+        socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE);
         this.listener = listener;
         this.applicationLoader = 
Thread.currentThread().getContextClassLoader();
     }
 
+
     protected final boolean isCloseRequired() {
         return closeRequired;
     }
 
+
     @Override
     public void write(int b) throws IOException {
         synchronized (writeLock) {
@@ -97,6 +117,7 @@ public abstract class AbstractServletOut
         doClose();
     }
 
+
     private void preWriteChecks() {
         if (buffer != null) {
             throw new IllegalStateException(
@@ -135,7 +156,9 @@ public abstract class AbstractServletOut
     protected final void onWritePossible() throws IOException {
         synchronized (writeLock) {
             try {
-                writeInternal(buffer, 0, buffer.length);
+                if (buffer != null) {
+                    writeInternal(buffer, 0, buffer.length);
+                }
             } catch (Throwable t) {
                 ExceptionUtils.handleThrowable(t);
                 Thread thread = Thread.currentThread();
@@ -153,8 +176,9 @@ public abstract class AbstractServletOut
                 }
             }
 
-           // Make sure isReady() and onWritePossible() have a consistent view 
of
-            // buffer and fireListener when determining if the listener should 
fire
+            // Make sure isReady() and onWritePossible() have a consistent view
+            // of buffer and fireListener when determining if the listener
+            // should fire
             boolean fire = false;
 
             synchronized (fireListenerLock) {
@@ -176,6 +200,7 @@ public abstract class AbstractServletOut
         }
     }
 
+
     /**
      * Abstract method to be overridden by concrete implementations. The base
      * class will ensure that there are no concurrent calls to this method for

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java?rev=1557092&r1=1557091&r2=1557092&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java 
(original)
+++ 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprServletOutputStream.java 
Fri Jan 10 11:43:02 2014
@@ -28,21 +28,20 @@ import org.apache.tomcat.jni.Status;
 import org.apache.tomcat.util.net.AprEndpoint;
 import org.apache.tomcat.util.net.SocketWrapper;
 
-public class AprServletOutputStream extends AbstractServletOutputStream {
+public class AprServletOutputStream extends AbstractServletOutputStream<Long> {
 
     private static final int SSL_OUTPUT_BUFFER_SIZE = 8192;
 
     private final AprEndpoint endpoint;
-    private final SocketWrapper<Long> wrapper;
     private final long socket;
     private volatile boolean closed = false;
     private final ByteBuffer sslOutputBuffer;
 
-    public AprServletOutputStream(SocketWrapper<Long> wrapper,
+    public AprServletOutputStream(SocketWrapper<Long> socketWrapper,
             AprEndpoint endpoint) {
+        super(socketWrapper);
         this.endpoint = endpoint;
-        this.wrapper = wrapper;
-        this.socket = wrapper.getSocket().longValue();
+        this.socket = socketWrapper.getSocket().longValue();
         if (endpoint.isSSLEnabled()) {
             sslOutputBuffer = 
ByteBuffer.allocateDirect(SSL_OUTPUT_BUFFER_SIZE);
             sslOutputBuffer.position(SSL_OUTPUT_BUFFER_SIZE);
@@ -60,12 +59,12 @@ public class AprServletOutputStream exte
             throw new IOException(sm.getString("apr.closed", 
Long.valueOf(socket)));
         }
 
-        Lock readLock = wrapper.getBlockingStatusReadLock();
-        WriteLock writeLock = wrapper.getBlockingStatusWriteLock();
+        Lock readLock = socketWrapper.getBlockingStatusReadLock();
+        WriteLock writeLock = socketWrapper.getBlockingStatusWriteLock();
 
         try {
             readLock.lock();
-            if (wrapper.getBlockingStatus() == block) {
+            if (socketWrapper.getBlockingStatus() == block) {
                 return doWriteInternal(b, off, len);
             }
         } finally {
@@ -75,7 +74,7 @@ public class AprServletOutputStream exte
         try {
             writeLock.lock();
             // Set the current settings for this socket
-            wrapper.setBlockingStatus(block);
+            socketWrapper.setBlockingStatus(block);
             if (block) {
                 Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000);
             } else {
@@ -141,7 +140,7 @@ public class AprServletOutputStream exte
                 throw new EOFException(sm.getString("apr.clientAbort"));
             } else if (written < 0) {
                 throw new IOException(sm.getString("apr.write.error",
-                        Integer.valueOf(-written), Long.valueOf(socket), 
wrapper));
+                        Integer.valueOf(-written), Long.valueOf(socket), 
socketWrapper));
             }
             start += written;
             left -= written;

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioServletOutputStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioServletOutputStream.java?rev=1557092&r1=1557091&r2=1557092&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioServletOutputStream.java 
(original)
+++ 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioServletOutputStream.java 
Fri Jan 10 11:43:02 2014
@@ -22,13 +22,14 @@ import java.net.Socket;
 
 import org.apache.tomcat.util.net.SocketWrapper;
 
-public class BioServletOutputStream extends AbstractServletOutputStream {
+public class BioServletOutputStream extends 
AbstractServletOutputStream<Socket> {
 
     private final OutputStream os;
 
-    public BioServletOutputStream(SocketWrapper<Socket> wrapper)
+    public BioServletOutputStream(SocketWrapper<Socket> socketWrapper)
             throws IOException {
-        os = wrapper.getSocket().getOutputStream();
+        super(socketWrapper);
+        os = socketWrapper.getSocket().getOutputStream();
     }
 
     @Override

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java?rev=1557092&r1=1557091&r2=1557092&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java 
(original)
+++ 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioServletOutputStream.java 
Fri Jan 10 11:43:02 2014
@@ -25,7 +25,7 @@ import org.apache.tomcat.util.net.NioEnd
 import org.apache.tomcat.util.net.NioSelectorPool;
 import org.apache.tomcat.util.net.SocketWrapper;
 
-public class NioServletOutputStream extends AbstractServletOutputStream {
+public class NioServletOutputStream extends 
AbstractServletOutputStream<NioChannel> {
 
     private final NioChannel channel;
     private final NioSelectorPool pool;
@@ -33,8 +33,9 @@ public class NioServletOutputStream exte
 
 
     public NioServletOutputStream(
-            SocketWrapper<NioChannel> wrapper, NioSelectorPool pool) {
-        channel = wrapper.getSocket();
+            SocketWrapper<NioChannel> socketWrapper, NioSelectorPool pool) {
+        super(socketWrapper);
+        channel = socketWrapper.getSocket();
         this.pool = pool;
         maxWrite = channel.getBufHandler().getWriteBuffer().capacity();
     }

Modified: tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java?rev=1557092&r1=1557091&r2=1557092&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java 
(original)
+++ tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgrade.java Fri Jan 
10 11:43:02 2014
@@ -93,6 +93,11 @@ public class TestUpgrade extends TomcatB
         doTestCheckClosed(SetWriteListenerTwice.class);
     }
 
+    @Test
+    public void testFirstCallToOnWritePossible() throws Exception {
+        doTestFixedResponse(FixedResponseNonBlocking.class);
+    }
+
     private void doTestCheckClosed(
             Class<? extends HttpUpgradeHandler> upgradeHandlerClass)
                     throws Exception {
@@ -104,6 +109,17 @@ public class TestUpgrade extends TomcatB
         Assert.assertEquals(-1, c);
     }
 
+    private void doTestFixedResponse(
+            Class<? extends HttpUpgradeHandler> upgradeHandlerClass)
+                    throws Exception {
+        UpgradeConnection conn = doUpgrade(upgradeHandlerClass);
+
+        Reader r = conn.getReader();
+        int c = r.read();
+
+        Assert.assertEquals(FixedResponseNonBlocking.FIXED_RESPONSE, c);
+    }
+
     private void doTestMessages (
             Class<? extends HttpUpgradeHandler> upgradeHandlerClass)
             throws Exception {
@@ -374,6 +390,46 @@ public class TestUpgrade extends TomcatB
     }
 
 
+    public static class FixedResponseNonBlocking implements HttpUpgradeHandler 
{
+
+        public static final char FIXED_RESPONSE = 'F';
+
+        private ServletInputStream sis;
+        private ServletOutputStream sos;
+
+        @Override
+        public void init(WebConnection connection) {
+
+            try {
+                sis = connection.getInputStream();
+                sos = connection.getOutputStream();
+            } catch (IOException ioe) {
+                throw new IllegalStateException(ioe);
+            }
+
+            sis.setReadListener(new NoOpReadListener());
+            sos.setWriteListener(new FixedResponseWriteListener());
+        }
+
+        @Override
+        public void destroy() {
+            // NO-OP
+        }
+
+        private class FixedResponseWriteListener extends NoOpWriteListener {
+            @Override
+            public void onWritePossible() {
+                try {
+                    sos.write(FIXED_RESPONSE);
+                    sos.flush();
+                } catch (IOException ioe) {
+                    throw new IllegalStateException(ioe);
+                }
+            }
+        }
+    }
+
+
     private static class NoOpReadListener implements ReadListener {
 
         @Override
@@ -392,6 +448,7 @@ public class TestUpgrade extends TomcatB
         }
     }
 
+
     private static class NoOpWriteListener implements WriteListener {
 
         @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to