Author: markt
Date: Sat Nov 24 17:26:51 2012
New Revision: 1413209

URL: http://svn.apache.org/viewvc?rev=1413209&view=rev
Log:
First cut of HTTP Upgrade for BIO with blocking
- NIO, APR still broken
- Non-blocking IO not implemented for Upgrade
- WebSocket still broken

Removed:
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeInbound.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeOutbound.java
Modified:
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties?rev=1413209&r1=1413208&r2=1413209&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties 
Sat Nov 24 17:26:51 2012
@@ -13,6 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+upgrade.sis.isFinished.ise=It is illegal to call isFinished() when the 
ServletInputStream is not in non-blocking mode (i.e. setReadListener() must be 
called first)
+upgrade.sis.isReady.ise=It is illegal to call isReady() when the 
ServletInputStream is not in non-blocking mode (i.e. setReadListener() must be 
called first)
+upgrade.sis.readListener.null=It is illegal to pass null to setReadListener()
+upgrade.sos.canWrite.ise=It is illegal to call canWrite() when the 
ServletOutputStream is not in non-blocking mode (i.e. setWriteListener() must 
be called first)
+upgrade.sos.writeListener.null=It is illegal to pass null to setWriteListener()
+
 apr.error=Unexpected error [{0}] reading data from the APR/native socket.
 
 nio.eof.error=Unexpected EOF read on the socket

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java?rev=1413209&r1=1413208&r2=1413209&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java 
Sat Nov 24 17:26:51 2012
@@ -27,59 +27,63 @@ import org.apache.tomcat.util.net.Socket
 
 public class UpgradeBioProcessor extends UpgradeProcessor<Socket> {
 
-    private final InputStream inputStream;
-    private final OutputStream outputStream;
+    private static final int INFINITE_TIMEOUT = 0;
 
     public UpgradeBioProcessor(SocketWrapper<Socket> wrapper,
             ProtocolHandler httpUpgradeProcessor) throws IOException {
-        super(upgradeInbound);
+        super(httpUpgradeProcessor, new BioUpgradeServletInputStream(wrapper),
+                new BioUpgradeServletOutputStream(wrapper));
 
-        int timeout = upgradeInbound.getReadTimeout();
-        if (timeout < 0) {
-            timeout = 0;
-        }
-        wrapper.getSocket().setSoTimeout(timeout);
-
-        this.inputStream = wrapper.getSocket().getInputStream();
-        this.outputStream = wrapper.getSocket().getOutputStream();
+        wrapper.getSocket().setSoTimeout(INFINITE_TIMEOUT);
     }
 
 
-    /*
-     * Output methods
-     */
-    @Override
-    public void flush() throws IOException {
-        outputStream.flush();
-    }
+    // ----------------------------------------------------------- Inner 
classes
 
+    private static class BioUpgradeServletInputStream
+            extends UpgradeServletInputStream {
 
-    @Override
-    public void write(int b) throws IOException {
-        outputStream.write(b);
-    }
+        private final InputStream is;
 
+        public BioUpgradeServletInputStream(SocketWrapper<Socket> wrapper)
+                throws IOException {
+            is = wrapper.getSocket().getInputStream();
+        }
+
+        @Override
+        protected int doRead() throws IOException {
+            return is.read();
+        }
 
-    @Override
-    public void write(byte[]b, int off, int len) throws IOException {
-        outputStream.write(b, off, len);
+        @Override
+        protected int doRead(byte[] b, int off, int len) throws IOException {
+            return is.read(b, off, len);
+        }
     }
 
+    private static class BioUpgradeServletOutputStream
+            extends UpgradeServletOutputStream {
 
-    /*
-     * Input methods
-     */
-    @Override
-    public int read() throws IOException {
-        return inputStream.read();
-    }
+        private final OutputStream os;
+
+        public BioUpgradeServletOutputStream(SocketWrapper<Socket> wrapper)
+                throws IOException {
+            os = wrapper.getSocket().getOutputStream();
+        }
 
+        @Override
+        protected void doWrite(int b) throws IOException {
+            os.write(b);
+        }
 
-    @Override
-    public int read(boolean block, byte[] bytes, int off, int len)
-            throws IOException {
-        // The BIO endpoint always uses blocking IO so the block parameter is
-        // ignored and a blocking read is performed.
-        return inputStream.read(bytes, off, len);
+        @Override
+        protected void doWrite(byte[] b, int off, int len) throws IOException {
+            os.write(b, off, len);
+        }
+
+        @Override
+        protected void doFlush() throws IOException {
+            os.flush();
+        }
     }
 }

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java?rev=1413209&r1=1413208&r2=1413209&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java 
Sat Nov 24 17:26:51 2012
@@ -19,6 +19,13 @@ package org.apache.coyote.http11.upgrade
 import java.io.IOException;
 import java.util.concurrent.Executor;
 
+import javax.servlet.ReadListener;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import javax.servlet.http.ProtocolHandler;
+import javax.servlet.http.WebConnection;
+
 import org.apache.coyote.Processor;
 import org.apache.coyote.Request;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
@@ -27,73 +34,66 @@ import org.apache.tomcat.util.net.Socket
 import org.apache.tomcat.util.net.SocketWrapper;
 import org.apache.tomcat.util.res.StringManager;
 
-public abstract class UpgradeProcessor<S> implements Processor<S> {
+public abstract class UpgradeProcessor<S>
+        implements Processor<S>, WebConnection {
 
     protected static final StringManager sm =
             StringManager.getManager(Constants.Package);
 
-    private final UpgradeInbound upgradeInbound;
+    private final ProtocolHandler httpUpgradeHandler;
+    private final ServletInputStream upgradeServletInputStream;
+    private final ServletOutputStream upgradeServletOutputStream;
+
+    protected UpgradeProcessor (ProtocolHandler httpUpgradeHandler,
+            UpgradeServletInputStream upgradeServletInputStream,
+            UpgradeServletOutputStream upgradeServletOutputStream) {
+        this.httpUpgradeHandler = httpUpgradeHandler;
+        this.upgradeServletInputStream = upgradeServletInputStream;
+        this.upgradeServletOutputStream = upgradeServletOutputStream;
+        this.httpUpgradeHandler.init(this);
+    }
+
 
-    protected UpgradeProcessor (UpgradeInbound upgradeInbound) {
-        this.upgradeInbound = upgradeInbound;
-        upgradeInbound.setUpgradeProcessor(this);
-        upgradeInbound.setUpgradeOutbound(new UpgradeOutbound(this));
-    }
-
-    // Output methods
-    public abstract void flush() throws IOException;
-    public abstract void write(int b) throws IOException;
-    public abstract void write(byte[] b, int off, int len) throws IOException;
-
-    // Input methods
-    /**
-     * This is always a blocking read of a single byte.
-     *
-     * @return  The next byte or -1 if the end of the input is reached.
-     *
-     * @throws IOException  If a problem occurs trying to read from the input
-     */
-    public abstract int read() throws IOException;
-
-    /**
-     * Read up to len bytes from the input in either blocking or non-blocking
-     * mode (where non-blocking is supported). If the input does not support
-     * non-blocking reads, a blcoking read will be performed.
-     *
-     * @param block
-     * @param bytes
-     * @param off
-     * @param len
-     * @return  The number of bytes read or -1 if the end of the input is
-     *          reached. Non-blocking reads may return zero if no data is
-     *          available. Blocking reads never return zero.
-     *
-     * @throws IOException  If a problem occurs trying to read from the input
-     */
-    public abstract int read(boolean block, byte[] bytes, int off, int len)
-            throws IOException;
+    // --------------------------------------------------- WebConnection 
methods
 
     @Override
-    public final UpgradeInbound getUpgradeInbound() {
-        return upgradeInbound;
+    public ServletInputStream getInputStream() throws IOException {
+        return upgradeServletInputStream;
     }
 
     @Override
-    public final SocketState upgradeDispatch() throws IOException {
-        return upgradeInbound.onData();
+    public ServletOutputStream getOutputStream() throws IOException {
+        return upgradeServletOutputStream;
     }
 
+
+    // ------------------------------------------- Implemented Processor 
methods
+
     @Override
     public final boolean isUpgrade() {
         return true;
     }
 
     @Override
+    public ProtocolHandler getHttpUpgradeHandler() {
+        return httpUpgradeHandler;
+    }
+
+    @Override
+    public final SocketState upgradeDispatch() throws IOException {
+
+        // TODO Handle read/write ready for non-blocking IO
+        return SocketState.UPGRADED;
+    }
+
+    @Override
     public final void recycle(boolean socketClosing) {
         // Currently a NO-OP as upgrade processors are not recycled.
     }
 
-    // NO-OP methods for upgrade
+
+    // ---------------------------- Processor methods that are NO-OP for 
upgrade
+
     @Override
     public final Executor getExecutor() {
         return null;
@@ -139,4 +139,104 @@ public abstract class UpgradeProcessor<S
     public final void setSslSupport(SSLSupport sslSupport) {
         // NOOP
     }
+
+
+    // ----------------------------------------------------------- Inner 
classes
+
+    protected abstract static class UpgradeServletInputStream extends
+            ServletInputStream {
+
+        private volatile ReadListener readListener = null;
+
+        @Override
+        public final boolean isFinished() {
+            if (readListener == null) {
+                throw new IllegalStateException(
+                        sm.getString("upgrade.sis.isFinished.ise"));
+            }
+
+            // TODO Support non-blocking IO
+            return false;
+        }
+
+        @Override
+        public final boolean isReady() {
+            if (readListener == null) {
+                throw new IllegalStateException(
+                        sm.getString("upgrade.sis.isReady.ise"));
+            }
+
+            // TODO Support non-blocking IO
+            return false;
+        }
+
+        @Override
+        public final void setReadListener(ReadListener listener) {
+            if (listener == null) {
+                throw new NullPointerException(
+                        sm.getString("upgrade.sis.readListener.null"));
+            }
+            this.readListener = listener;
+        }
+
+        @Override
+        public final int read() throws IOException {
+            return doRead();
+        }
+
+        @Override
+        public final int read(byte[] b, int off, int len) throws IOException {
+            return doRead(b, off, len);
+        }
+
+        protected abstract int doRead() throws IOException;
+        protected abstract int doRead(byte[] b, int off, int len)
+                throws IOException;
+    }
+
+    protected abstract static class UpgradeServletOutputStream extends
+            ServletOutputStream {
+
+        private volatile WriteListener writeListener = null;
+
+        @Override
+        public boolean canWrite() {
+            if (writeListener == null) {
+                throw new IllegalStateException(
+                        sm.getString("upgrade.sos.canWrite.ise"));
+            }
+
+            // TODO Support non-blocking IO
+            return false;
+        }
+
+        @Override
+        public void setWriteListener(WriteListener listener) {
+            if (listener == null) {
+                throw new NullPointerException(
+                        sm.getString("upgrade.sos.writeListener.null"));
+            }
+            this.writeListener = listener;
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            doWrite(b);
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            doWrite(b, off, len);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            doFlush();
+        }
+
+        protected abstract void doWrite(int b) throws IOException;
+        protected abstract void doWrite(byte[] b, int off, int len)
+                throws IOException;
+        protected abstract void doFlush() throws IOException;
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to