Author: markt
Date: Sat Nov 24 17:51:48 2012
New Revision: 1413221

URL: http://svn.apache.org/viewvc?rev=1413221&view=rev
Log:
Re-write reading from upgraded connection to use non-blocking IO.
NIO tested for basic operations
BIO untested
APR not written

Added:
    
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java
    
tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java
    
tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java
    
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
Modified:
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
    
tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java

Added: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java?rev=1413221&view=auto
==============================================================================
--- 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java
 (added)
+++ 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java
 Sat Nov 24 17:51:48 2012
@@ -0,0 +1,79 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http11.upgrade;
+
+import java.io.IOException;
+
+import org.apache.tomcat.util.net.SocketWrapper;
+
+public class AprUpgradeServletInputStream extends UpgradeServletInputStream {
+
+    private final long socket;
+
+    public AprUpgradeServletInputStream(SocketWrapper<Long> wrapper) {
+        this.socket = wrapper.getSocket().longValue();
+    }
+/*
+    @Override
+    protected int doRead() throws IOException {
+        byte[] bytes = new byte[1];
+        int result = Socket.recv(socket, bytes, 0, 1);
+        if (result == -1) {
+            return -1;
+        } else {
+            return bytes[0] & 0xFF;
+        }
+    }
+
+    @Override
+    protected int doRead(byte[] b, int off, int len) throws IOException {
+        boolean block = true;
+        if (!block) {
+            Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1);
+        }
+        try {
+            int result = Socket.recv(socket, b, off, len);
+            if (result > 0) {
+                return result;
+            } else if (-result == Status.EAGAIN) {
+                return 0;
+            } else {
+                throw new IOException(sm.getString("apr.error",
+                        Integer.valueOf(-result)));
+            }
+        } finally {
+            if (!block) {
+                Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
+            }
+        }
+    }
+}
+*/
+
+    @Override
+    protected int doRead(boolean block) throws IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    protected boolean doIsReady() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+}

Added: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java?rev=1413221&view=auto
==============================================================================
--- 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java
 (added)
+++ 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java
 Sat Nov 24 17:51:48 2012
@@ -0,0 +1,44 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http11.upgrade;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+
+import org.apache.tomcat.util.net.SocketWrapper;
+
+public class BioUpgradeServletInputStream extends UpgradeServletInputStream {
+
+    private final InputStream inputStream;
+
+    public BioUpgradeServletInputStream(SocketWrapper<Socket> wrapper)
+            throws IOException {
+        inputStream = wrapper.getSocket().getInputStream();
+    }
+
+    @Override
+    protected int doRead(boolean block) throws IOException {
+        return inputStream.read();
+    }
+
+    @Override
+    protected boolean doIsReady() {
+        // Always returns true for BIO
+        return true;
+    }
+}

Added: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java?rev=1413221&view=auto
==============================================================================
--- 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java
 (added)
+++ 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java
 Sat Nov 24 17:51:48 2012
@@ -0,0 +1,145 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http11.upgrade;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Selector;
+
+import org.apache.tomcat.util.net.NioChannel;
+import org.apache.tomcat.util.net.NioEndpoint;
+import org.apache.tomcat.util.net.NioSelectorPool;
+import org.apache.tomcat.util.net.SocketWrapper;
+
+public class NioUpgradeServletInputStream extends UpgradeServletInputStream {
+
+    private final NioChannel channel;
+    private final NioSelectorPool pool;
+
+    public NioUpgradeServletInputStream(SocketWrapper<NioChannel> wrapper,
+            NioSelectorPool pool) {
+        this.channel = wrapper.getSocket();
+        this.pool = pool;
+    }
+
+    @Override
+    protected int doRead(boolean block) throws IOException {
+        byte[] bytes = new byte[1];
+        int result = readSocket(block, bytes, 0, 1);
+        if (result == 0) {
+            return NO_DATA;
+        } else if (result == -1) {
+            return EOF;
+        } else {
+            return bytes[0] & 0xFF;
+        }
+    }
+
+    @Override
+    protected boolean doIsReady() throws IOException {
+        ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer();
+
+        if (readBuffer.remaining() > 0) {
+            return true;
+        }
+
+        readBuffer.clear();
+        fillReadBuffer(false);
+
+        boolean isReady = readBuffer.position() > 0;
+        readBuffer.flip();
+        return isReady;
+    }
+
+    private int readSocket(boolean block, byte[] b, int off, int len)
+            throws IOException {
+
+        ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer();
+        int remaining = readBuffer.remaining();
+
+        // Is there enough data in the read buffer to satisfy this request?
+        if (remaining >= len) {
+            readBuffer.get(b, off, len);
+            return len;
+        }
+
+        // Copy what data there is in the read buffer to the byte array
+        int leftToWrite = len;
+        int newOffset = off;
+        if (remaining > 0) {
+            readBuffer.get(b, off, remaining);
+            leftToWrite -= remaining;
+            newOffset += remaining;
+        }
+
+        // Fill the read buffer as best we can
+        readBuffer.clear();
+        int nRead = fillReadBuffer(block);
+
+        // Full as much of the remaining byte array as possible with the data
+        // that was just read
+        if (nRead > 0) {
+            readBuffer.flip();
+            readBuffer.limit(nRead);
+            if (nRead > leftToWrite) {
+                readBuffer.get(b, newOffset, leftToWrite);
+                leftToWrite = 0;
+            } else {
+                readBuffer.get(b, newOffset, nRead);
+                leftToWrite -= nRead;
+            }
+        } else if (nRead == 0) {
+            readBuffer.flip();
+        } else if (nRead == -1) {
+            // TODO i18n
+            throw new EOFException();
+        }
+
+        return len - leftToWrite;
+    }
+
+    private int fillReadBuffer(boolean block) throws IOException {
+        int nRead;
+        if (block) {
+            Selector selector = null;
+            try {
+                selector = pool.get();
+            } catch ( IOException x ) {
+                // Ignore
+            }
+            try {
+                NioEndpoint.KeyAttachment att =
+                        (NioEndpoint.KeyAttachment) 
channel.getAttachment(false);
+                if (att == null) {
+                    throw new IOException("Key must be cancelled.");
+                }
+                nRead = pool.read(channel.getBufHandler().getReadBuffer(),
+                        channel, selector, att.getTimeout());
+            } catch (EOFException eof) {
+                nRead = -1;
+            } finally {
+                if (selector != null) {
+                    pool.put(selector);
+                }
+            }
+        } else {
+            nRead = channel.read(channel.getBufHandler().getReadBuffer());
+        }
+        return nRead;
+    }
+}

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java?rev=1413221&r1=1413220&r2=1413221&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java 
Sat Nov 24 17:51:48 2012
@@ -21,7 +21,6 @@ import java.io.IOException;
 import javax.servlet.http.ProtocolHandler;
 
 import org.apache.tomcat.jni.Socket;
-import org.apache.tomcat.jni.Status;
 import org.apache.tomcat.util.net.SocketWrapper;
 
 public class UpgradeAprProcessor extends UpgradeProcessor<Long> {
@@ -31,7 +30,7 @@ public class UpgradeAprProcessor extends
     public UpgradeAprProcessor(SocketWrapper<Long> wrapper,
             ProtocolHandler httpUpgradeProcessor) {
         super(httpUpgradeProcessor,
-                new 
AprUpgradeServletInputStream(wrapper.getSocket().longValue()),
+                new AprUpgradeServletInputStream(wrapper),
                 new 
AprUpgradeServletOutputStream(wrapper.getSocket().longValue()));
 
         Socket.timeoutSet(wrapper.getSocket().longValue(), INFINITE_TIMEOUT);
@@ -40,50 +39,6 @@ public class UpgradeAprProcessor extends
 
     // ----------------------------------------------------------- Inner 
classes
 
-    private static class AprUpgradeServletInputStream
-            extends UpgradeServletInputStream {
-
-        private final long socket;
-
-        public AprUpgradeServletInputStream(long socket) {
-            this.socket = socket;
-        }
-
-        @Override
-        protected int doRead() throws IOException {
-            byte[] bytes = new byte[1];
-            int result = Socket.recv(socket, bytes, 0, 1);
-            if (result == -1) {
-                return -1;
-            } else {
-                return bytes[0] & 0xFF;
-            }
-        }
-
-        @Override
-        protected int doRead(byte[] b, int off, int len) throws IOException {
-            boolean block = true;
-            if (!block) {
-                Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1);
-            }
-            try {
-                int result = Socket.recv(socket, b, off, len);
-                if (result > 0) {
-                    return result;
-                } else if (-result == Status.EAGAIN) {
-                    return 0;
-                } else {
-                    throw new IOException(sm.getString("apr.error",
-                            Integer.valueOf(-result)));
-                }
-            } finally {
-                if (!block) {
-                    Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
-                }
-            }
-        }
-    }
-
     private static class AprUpgradeServletOutputStream
             extends UpgradeServletOutputStream {
 

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java?rev=1413221&r1=1413220&r2=1413221&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java 
Sat Nov 24 17:51:48 2012
@@ -17,7 +17,6 @@
 package org.apache.coyote.http11.upgrade;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 
@@ -40,27 +39,6 @@ public class UpgradeBioProcessor extends
 
     // ----------------------------------------------------------- Inner 
classes
 
-    private static class BioUpgradeServletInputStream
-            extends UpgradeServletInputStream {
-
-        private final InputStream is;
-
-        public BioUpgradeServletInputStream(SocketWrapper<Socket> wrapper)
-                throws IOException {
-            is = wrapper.getSocket().getInputStream();
-        }
-
-        @Override
-        protected int doRead() throws IOException {
-            return is.read();
-        }
-
-        @Override
-        protected int doRead(byte[] b, int off, int len) throws IOException {
-            return is.read(b, off, len);
-        }
-    }
-
     private static class BioUpgradeServletOutputStream
             extends UpgradeServletOutputStream {
 

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java?rev=1413221&r1=1413220&r2=1413221&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java 
Sat Nov 24 17:51:48 2012
@@ -16,9 +16,7 @@
  */
 package org.apache.coyote.http11.upgrade;
 
-import java.io.EOFException;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.channels.Selector;
 
 import javax.servlet.http.ProtocolHandler;
@@ -44,118 +42,6 @@ public class UpgradeNioProcessor extends
 
     // ----------------------------------------------------------- Inner 
classes
 
-    private static class NioUpgradeServletInputStream
-            extends UpgradeServletInputStream {
-
-        private final NioChannel nioChannel;
-        private final NioSelectorPool pool;
-        private final int maxRead;
-
-        public NioUpgradeServletInputStream(SocketWrapper<NioChannel> wrapper,
-                NioSelectorPool pool) {
-            nioChannel = wrapper.getSocket();
-            this.pool = pool;
-            maxRead = nioChannel.getBufHandler().getReadBuffer().capacity();
-        }
-
-        @Override
-        protected int doRead() throws IOException {
-            byte[] bytes = new byte[1];
-            int result = readSocket(true, bytes, 0, 1);
-            if (result == -1) {
-                return -1;
-            } else {
-                return bytes[0] & 0xFF;
-            }
-        }
-
-        @Override
-        protected int doRead(byte[] b, int off, int len) throws IOException {
-            if (len > maxRead) {
-                return readSocket(true, b, off, maxRead);
-            } else {
-                return readSocket(true, b, off, len);
-            }
-        }
-
-        private int readSocket(boolean block, byte[] b, int off, int len)
-                throws IOException {
-
-            ByteBuffer readBuffer = nioChannel.getBufHandler().getReadBuffer();
-            int remaining = readBuffer.remaining();
-
-            // Is there enough data in the read buffer to satisfy this request?
-            if (remaining >= len) {
-                readBuffer.get(b, off, len);
-                return len;
-            }
-
-            // Copy what data there is in the read buffer to the byte array
-            int leftToWrite = len;
-            int newOffset = off;
-            if (remaining > 0) {
-                readBuffer.get(b, off, remaining);
-                leftToWrite -= remaining;
-                newOffset += remaining;
-            }
-
-            // Fill the read buffer as best we can
-            readBuffer.clear();
-            int nRead = fillReadBuffer(block);
-
-            // Full as much of the remaining byte array as possible with the 
data
-            // that was just read
-            if (nRead > 0) {
-                readBuffer.flip();
-                readBuffer.limit(nRead);
-                if (nRead > leftToWrite) {
-                    readBuffer.get(b, newOffset, leftToWrite);
-                    leftToWrite = 0;
-                } else {
-                    readBuffer.get(b, newOffset, nRead);
-                    leftToWrite -= nRead;
-                }
-            } else if (nRead == 0) {
-                readBuffer.flip();
-                readBuffer.limit(nRead);
-            } else if (nRead == -1) {
-                throw new EOFException(sm.getString("nio.eof.error"));
-            }
-
-            return len - leftToWrite;
-        }
-
-        private int fillReadBuffer(boolean block) throws IOException {
-            int nRead;
-            if (block) {
-                Selector selector = null;
-                try {
-                    selector = pool.get();
-                } catch ( IOException x ) {
-                    // Ignore
-                }
-                try {
-                    NioEndpoint.KeyAttachment att =
-                            (NioEndpoint.KeyAttachment) 
nioChannel.getAttachment(false);
-                    if (att == null) {
-                        throw new IOException("Key must be cancelled.");
-                    }
-                    nRead = 
pool.read(nioChannel.getBufHandler().getReadBuffer(),
-                            nioChannel, selector, att.getTimeout());
-                } catch (EOFException eof) {
-                    nRead = -1;
-                } finally {
-                    if (selector != null) {
-                        pool.put(selector);
-                    }
-                }
-            } else {
-                nRead = 
nioChannel.read(nioChannel.getBufHandler().getReadBuffer());
-            }
-            return nRead;
-        }
-    }
-
     private static class NioUpgradeServletOutputStream
             extends UpgradeServletOutputStream {
 

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java?rev=1413221&r1=1413220&r2=1413221&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java 
Sat Nov 24 17:51:48 2012
@@ -19,7 +19,6 @@ package org.apache.coyote.http11.upgrade
 import java.io.IOException;
 import java.util.concurrent.Executor;
 
-import javax.servlet.ReadListener;
 import javax.servlet.ServletInputStream;
 import javax.servlet.ServletOutputStream;
 import javax.servlet.WriteListener;
@@ -41,8 +40,8 @@ public abstract class UpgradeProcessor<S
             StringManager.getManager(Constants.Package);
 
     private final ProtocolHandler httpUpgradeHandler;
-    private final ServletInputStream upgradeServletInputStream;
-    private final ServletOutputStream upgradeServletOutputStream;
+    private final UpgradeServletInputStream upgradeServletInputStream;
+    private final UpgradeServletOutputStream upgradeServletOutputStream;
 
     protected UpgradeProcessor (ProtocolHandler httpUpgradeHandler,
             UpgradeServletInputStream upgradeServletInputStream,
@@ -83,7 +82,14 @@ public abstract class UpgradeProcessor<S
     public final SocketState upgradeDispatch(SocketStatus status)
             throws IOException {
 
-        // TODO Handle read/write ready for non-blocking IO
+        if (status == SocketStatus.OPEN_READ) {
+            upgradeServletInputStream.onDataAvailable();
+        } else if (status == SocketStatus.OPEN_WRITE) {
+            upgradeServletOutputStream.writeListener.onWritePossible();
+        } else {
+            // Unexpected state
+            return SocketState.CLOSED;
+        }
         return SocketState.UPGRADED;
     }
 
@@ -144,57 +150,6 @@ public abstract class UpgradeProcessor<S
 
     // ----------------------------------------------------------- Inner 
classes
 
-    protected abstract static class UpgradeServletInputStream extends
-            ServletInputStream {
-
-        private volatile ReadListener readListener = null;
-
-        @Override
-        public final boolean isFinished() {
-            if (readListener == null) {
-                throw new IllegalStateException(
-                        sm.getString("upgrade.sis.isFinished.ise"));
-            }
-
-            // TODO Support non-blocking IO
-            return false;
-        }
-
-        @Override
-        public final boolean isReady() {
-            if (readListener == null) {
-                throw new IllegalStateException(
-                        sm.getString("upgrade.sis.isReady.ise"));
-            }
-
-            // TODO Support non-blocking IO
-            return false;
-        }
-
-        @Override
-        public final void setReadListener(ReadListener listener) {
-            if (listener == null) {
-                throw new NullPointerException(
-                        sm.getString("upgrade.sis.readListener.null"));
-            }
-            this.readListener = listener;
-        }
-
-        @Override
-        public final int read() throws IOException {
-            return doRead();
-        }
-
-        @Override
-        public final int read(byte[] b, int off, int len) throws IOException {
-            return doRead(b, off, len);
-        }
-
-        protected abstract int doRead() throws IOException;
-        protected abstract int doRead(byte[] b, int off, int len)
-                throws IOException;
-    }
-
     protected abstract static class UpgradeServletOutputStream extends
             ServletOutputStream {
 

Added: 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java?rev=1413221&view=auto
==============================================================================
--- 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
 (added)
+++ 
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
 Sat Nov 24 17:51:48 2012
@@ -0,0 +1,87 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.coyote.http11.upgrade;
+
+import java.io.IOException;
+
+import javax.servlet.ReadListener;
+import javax.servlet.ServletInputStream;
+
+public abstract class UpgradeServletInputStream extends ServletInputStream {
+
+    protected static final int EOF = -1;
+    protected static final int NO_DATA = -2;
+
+    private volatile boolean finished = false;
+    private volatile boolean ready = true;
+    private volatile ReadListener listener = null;
+
+    @Override
+    public final boolean isFinished() {
+        return finished;
+    }
+
+    @Override
+    public boolean isReady() {
+        try {
+            ready = doIsReady();
+        } catch (IOException e) {
+            listener.onError(e);
+        }
+        return ready;
+    }
+
+    @Override
+    public void setReadListener(ReadListener listener) {
+        if (listener == null) {
+            // TODO i18n
+            throw new IllegalArgumentException();
+        }
+        this.listener = listener;
+
+        isReady();
+    }
+
+    @Override
+    public final int read() throws IOException {
+        if (!ready) {
+            // TODO i18n
+            throw new IllegalStateException();
+        }
+        ReadListener readListener = this.listener;
+        int result = doRead(readListener == null);
+        if (result == EOF) {
+            finished = true;
+            if (readListener != null) {
+                readListener.onAllDataRead();
+            }
+            return EOF;
+        } else if (result == NO_DATA) {
+            return EOF;
+        }
+        return result;
+    }
+
+    protected void onDataAvailable() {
+        ready = true;
+        listener.onDataAvailable();
+    }
+
+    protected abstract int doRead(boolean block) throws IOException;
+
+    protected abstract boolean doIsReady() throws IOException;
+}

Modified: 
tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java?rev=1413221&r1=1413220&r2=1413221&view=diff
==============================================================================
--- 
tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java
 (original)
+++ 
tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java
 Sat Nov 24 17:51:48 2012
@@ -26,7 +26,11 @@ import java.io.Writer;
 import java.net.Socket;
 
 import javax.net.SocketFactory;
+import javax.servlet.ReadListener;
 import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -45,8 +49,34 @@ import org.apache.catalina.startup.Tomca
 
 public class TestUpgradeServletInputStream extends TomcatBaseTest {
 
+    private static final String MESSAGE = "This is a test.\n";
+
     @Test
     public void testSimpleUpgrade() throws Exception {
+        doUpgrade();
+    }
+
+    @Test
+    public void testSingleMessage() throws Exception {
+        UpgradeConnection conn = doUpgrade();
+        Writer writer = conn.getWriter();
+        BufferedReader reader = conn.getReader();
+
+        writer.write(MESSAGE);
+        writer.flush();
+
+        Thread.sleep(500);
+
+        writer.write(MESSAGE);
+        writer.flush();
+
+        String response = reader.readLine();
+
+        Assert.assertEquals(MESSAGE, response);
+    }
+
+
+    private UpgradeConnection doUpgrade() throws Exception {
         // Setup Tomcat instance
         Tomcat tomcat = getTomcatInstance();
 
@@ -65,6 +95,8 @@ public class TestUpgradeServletInputStre
         Socket socket =
                 SocketFactory.getDefault().createSocket("localhost", 
getPort());
 
+        socket.setSoTimeout(300000);
+
         InputStream is = socket.getInputStream();
         OutputStream os = socket.getOutputStream();
 
@@ -80,6 +112,13 @@ public class TestUpgradeServletInputStre
 
         Assert.assertEquals("HTTP/1.1 101 Switching Protocols",
                 status.substring(0, 32));
+
+        // Skip the remaining response headers
+        while (reader.readLine().length() > 0) {
+            // Skip
+        }
+
+        return new UpgradeConnection(writer, reader);
     }
 
     private static class UpgradeServlet extends HttpServlet {
@@ -94,11 +133,94 @@ public class TestUpgradeServletInputStre
         }
     }
 
+    private static class UpgradeConnection {
+        private final Writer writer;
+        private final BufferedReader reader;
+
+        public UpgradeConnection(Writer writer, BufferedReader reader) {
+            this.writer = writer;
+            this.reader = reader;
+        }
+
+        public Writer getWriter() {
+            return writer;
+        }
+
+        public BufferedReader getReader() {
+            return reader;
+        }
+    }
+
     private static class Echo implements ProtocolHandler {
 
+        private ServletInputStream sis;
+        private ServletOutputStream sos;
+
         @Override
         public void init(WebConnection connection) {
-            // TODO
+
+            try {
+                sis = connection.getInputStream();
+                sos = connection.getOutputStream();
+            } catch (IOException ioe) {
+                throw new IllegalStateException(ioe);
+            }
+
+            sis.setReadListener(new EchoReadListener());
+            sos.setWriteListener(new EchoWriteListener());
+        }
+
+        private class EchoReadListener implements ReadListener {
+
+            private byte[] buffer = new byte[8096];
+
+            @Override
+            public void onDataAvailable() {
+                try {
+                    while (sis.isReady()) {
+                        int read = sis.read(buffer);
+                        if (read > 0) {
+                            System.out.print(new String(buffer, 0, read));
+                            /*
+                            if (sos.canWrite()) {
+                                sos.write(buffer, 0, read);
+                            } else {
+                                throw new IOException("Unable to echo data. " +
+                                        "canWrite() returned false");
+                            }
+                            */
+                        }
+                    }
+                } catch (IOException ioe) {
+                    throw new RuntimeException(ioe);
+                }
+            }
+
+            @Override
+            public void onAllDataRead() {
+                System.out.println("All data read");
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                // TODO Auto-generated method stub
+
+            }
+        }
+
+        private class EchoWriteListener implements WriteListener {
+
+            @Override
+            public void onWritePossible() {
+                // TODO Auto-generated method stub
+
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                // TODO Auto-generated method stub
+
+            }
         }
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to