Author: markt Date: Sat Nov 24 17:51:48 2012 New Revision: 1413221 URL: http://svn.apache.org/viewvc?rev=1413221&view=rev Log: Re-write reading from upgraded connection to use non-blocking IO. NIO tested for basic operations BIO untested APR not written
Added: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java Added: tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java?rev=1413221&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java (added) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java Sat Nov 24 17:51:48 2012 @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.coyote.http11.upgrade; + +import java.io.IOException; + +import org.apache.tomcat.util.net.SocketWrapper; + +public class AprUpgradeServletInputStream extends UpgradeServletInputStream { + + private final long socket; + + public AprUpgradeServletInputStream(SocketWrapper<Long> wrapper) { + this.socket = wrapper.getSocket().longValue(); + } +/* + @Override + protected int doRead() throws IOException { + byte[] bytes = new byte[1]; + int result = Socket.recv(socket, bytes, 0, 1); + if (result == -1) { + return -1; + } else { + return bytes[0] & 0xFF; + } + } + + @Override + protected int doRead(byte[] b, int off, int len) throws IOException { + boolean block = true; + if (!block) { + Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1); + } + try { + int result = Socket.recv(socket, b, off, len); + if (result > 0) { + return result; + } else if (-result == Status.EAGAIN) { + return 0; + } else { + throw new IOException(sm.getString("apr.error", + Integer.valueOf(-result))); + } + } finally { + if (!block) { + Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0); + } + } + } +} +*/ + + @Override + protected int doRead(boolean block) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + protected boolean doIsReady() { + // TODO Auto-generated method stub + return false; + } + +} Added: tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java?rev=1413221&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java (added) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java Sat Nov 24 17:51:48 2012 @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.coyote.http11.upgrade; + +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; + +import org.apache.tomcat.util.net.SocketWrapper; + +public class BioUpgradeServletInputStream extends UpgradeServletInputStream { + + private final InputStream inputStream; + + public BioUpgradeServletInputStream(SocketWrapper<Socket> wrapper) + throws IOException { + inputStream = wrapper.getSocket().getInputStream(); + } + + @Override + protected int doRead(boolean block) throws IOException { + return inputStream.read(); + } + + @Override + protected boolean doIsReady() { + // Always returns true for BIO + return true; + } +} Added: tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java?rev=1413221&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java (added) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java Sat Nov 24 17:51:48 2012 @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.coyote.http11.upgrade; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Selector; + +import org.apache.tomcat.util.net.NioChannel; +import org.apache.tomcat.util.net.NioEndpoint; +import org.apache.tomcat.util.net.NioSelectorPool; +import org.apache.tomcat.util.net.SocketWrapper; + +public class NioUpgradeServletInputStream extends UpgradeServletInputStream { + + private final NioChannel channel; + private final NioSelectorPool pool; + + public NioUpgradeServletInputStream(SocketWrapper<NioChannel> wrapper, + NioSelectorPool pool) { + this.channel = wrapper.getSocket(); + this.pool = pool; + } + + @Override + protected int doRead(boolean block) throws IOException { + byte[] bytes = new byte[1]; + int result = readSocket(block, bytes, 0, 1); + if (result == 0) { + return NO_DATA; + } else if (result == -1) { + return EOF; + } else { + return bytes[0] & 0xFF; + } + } + + @Override + protected boolean doIsReady() throws IOException { + ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer(); + + if (readBuffer.remaining() > 0) { + return true; + } + + readBuffer.clear(); + fillReadBuffer(false); + + boolean isReady = readBuffer.position() > 0; + readBuffer.flip(); + return isReady; + } + + private int readSocket(boolean block, byte[] b, int off, int len) + throws IOException { + + ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer(); + int remaining = readBuffer.remaining(); + + // Is there enough data in the read buffer to satisfy this request? + if (remaining >= len) { + readBuffer.get(b, off, len); + return len; + } + + // Copy what data there is in the read buffer to the byte array + int leftToWrite = len; + int newOffset = off; + if (remaining > 0) { + readBuffer.get(b, off, remaining); + leftToWrite -= remaining; + newOffset += remaining; + } + + // Fill the read buffer as best we can + readBuffer.clear(); + int nRead = fillReadBuffer(block); + + // Full as much of the remaining byte array as possible with the data + // that was just read + if (nRead > 0) { + readBuffer.flip(); + readBuffer.limit(nRead); + if (nRead > leftToWrite) { + readBuffer.get(b, newOffset, leftToWrite); + leftToWrite = 0; + } else { + readBuffer.get(b, newOffset, nRead); + leftToWrite -= nRead; + } + } else if (nRead == 0) { + readBuffer.flip(); + } else if (nRead == -1) { + // TODO i18n + throw new EOFException(); + } + + return len - leftToWrite; + } + + private int fillReadBuffer(boolean block) throws IOException { + int nRead; + if (block) { + Selector selector = null; + try { + selector = pool.get(); + } catch ( IOException x ) { + // Ignore + } + try { + NioEndpoint.KeyAttachment att = + (NioEndpoint.KeyAttachment) channel.getAttachment(false); + if (att == null) { + throw new IOException("Key must be cancelled."); + } + nRead = pool.read(channel.getBufHandler().getReadBuffer(), + channel, selector, att.getTimeout()); + } catch (EOFException eof) { + nRead = -1; + } finally { + if (selector != null) { + pool.put(selector); + } + } + } else { + nRead = channel.read(channel.getBufHandler().getReadBuffer()); + } + return nRead; + } +} Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java?rev=1413221&r1=1413220&r2=1413221&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java Sat Nov 24 17:51:48 2012 @@ -21,7 +21,6 @@ import java.io.IOException; import javax.servlet.http.ProtocolHandler; import org.apache.tomcat.jni.Socket; -import org.apache.tomcat.jni.Status; import org.apache.tomcat.util.net.SocketWrapper; public class UpgradeAprProcessor extends UpgradeProcessor<Long> { @@ -31,7 +30,7 @@ public class UpgradeAprProcessor extends public UpgradeAprProcessor(SocketWrapper<Long> wrapper, ProtocolHandler httpUpgradeProcessor) { super(httpUpgradeProcessor, - new AprUpgradeServletInputStream(wrapper.getSocket().longValue()), + new AprUpgradeServletInputStream(wrapper), new AprUpgradeServletOutputStream(wrapper.getSocket().longValue())); Socket.timeoutSet(wrapper.getSocket().longValue(), INFINITE_TIMEOUT); @@ -40,50 +39,6 @@ public class UpgradeAprProcessor extends // ----------------------------------------------------------- Inner classes - private static class AprUpgradeServletInputStream - extends UpgradeServletInputStream { - - private final long socket; - - public AprUpgradeServletInputStream(long socket) { - this.socket = socket; - } - - @Override - protected int doRead() throws IOException { - byte[] bytes = new byte[1]; - int result = Socket.recv(socket, bytes, 0, 1); - if (result == -1) { - return -1; - } else { - return bytes[0] & 0xFF; - } - } - - @Override - protected int doRead(byte[] b, int off, int len) throws IOException { - boolean block = true; - if (!block) { - Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1); - } - try { - int result = Socket.recv(socket, b, off, len); - if (result > 0) { - return result; - } else if (-result == Status.EAGAIN) { - return 0; - } else { - throw new IOException(sm.getString("apr.error", - Integer.valueOf(-result))); - } - } finally { - if (!block) { - Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0); - } - } - } - } - private static class AprUpgradeServletOutputStream extends UpgradeServletOutputStream { Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java?rev=1413221&r1=1413220&r2=1413221&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java Sat Nov 24 17:51:48 2012 @@ -17,7 +17,6 @@ package org.apache.coyote.http11.upgrade; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; @@ -40,27 +39,6 @@ public class UpgradeBioProcessor extends // ----------------------------------------------------------- Inner classes - private static class BioUpgradeServletInputStream - extends UpgradeServletInputStream { - - private final InputStream is; - - public BioUpgradeServletInputStream(SocketWrapper<Socket> wrapper) - throws IOException { - is = wrapper.getSocket().getInputStream(); - } - - @Override - protected int doRead() throws IOException { - return is.read(); - } - - @Override - protected int doRead(byte[] b, int off, int len) throws IOException { - return is.read(b, off, len); - } - } - private static class BioUpgradeServletOutputStream extends UpgradeServletOutputStream { Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java?rev=1413221&r1=1413220&r2=1413221&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java Sat Nov 24 17:51:48 2012 @@ -16,9 +16,7 @@ */ package org.apache.coyote.http11.upgrade; -import java.io.EOFException; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.Selector; import javax.servlet.http.ProtocolHandler; @@ -44,118 +42,6 @@ public class UpgradeNioProcessor extends // ----------------------------------------------------------- Inner classes - private static class NioUpgradeServletInputStream - extends UpgradeServletInputStream { - - private final NioChannel nioChannel; - private final NioSelectorPool pool; - private final int maxRead; - - public NioUpgradeServletInputStream(SocketWrapper<NioChannel> wrapper, - NioSelectorPool pool) { - nioChannel = wrapper.getSocket(); - this.pool = pool; - maxRead = nioChannel.getBufHandler().getReadBuffer().capacity(); - } - - @Override - protected int doRead() throws IOException { - byte[] bytes = new byte[1]; - int result = readSocket(true, bytes, 0, 1); - if (result == -1) { - return -1; - } else { - return bytes[0] & 0xFF; - } - } - - @Override - protected int doRead(byte[] b, int off, int len) throws IOException { - if (len > maxRead) { - return readSocket(true, b, off, maxRead); - } else { - return readSocket(true, b, off, len); - } - } - - private int readSocket(boolean block, byte[] b, int off, int len) - throws IOException { - - ByteBuffer readBuffer = nioChannel.getBufHandler().getReadBuffer(); - int remaining = readBuffer.remaining(); - - // Is there enough data in the read buffer to satisfy this request? - if (remaining >= len) { - readBuffer.get(b, off, len); - return len; - } - - // Copy what data there is in the read buffer to the byte array - int leftToWrite = len; - int newOffset = off; - if (remaining > 0) { - readBuffer.get(b, off, remaining); - leftToWrite -= remaining; - newOffset += remaining; - } - - // Fill the read buffer as best we can - readBuffer.clear(); - int nRead = fillReadBuffer(block); - - // Full as much of the remaining byte array as possible with the data - // that was just read - if (nRead > 0) { - readBuffer.flip(); - readBuffer.limit(nRead); - if (nRead > leftToWrite) { - readBuffer.get(b, newOffset, leftToWrite); - leftToWrite = 0; - } else { - readBuffer.get(b, newOffset, nRead); - leftToWrite -= nRead; - } - } else if (nRead == 0) { - readBuffer.flip(); - readBuffer.limit(nRead); - } else if (nRead == -1) { - throw new EOFException(sm.getString("nio.eof.error")); - } - - return len - leftToWrite; - } - - private int fillReadBuffer(boolean block) throws IOException { - int nRead; - if (block) { - Selector selector = null; - try { - selector = pool.get(); - } catch ( IOException x ) { - // Ignore - } - try { - NioEndpoint.KeyAttachment att = - (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false); - if (att == null) { - throw new IOException("Key must be cancelled."); - } - nRead = pool.read(nioChannel.getBufHandler().getReadBuffer(), - nioChannel, selector, att.getTimeout()); - } catch (EOFException eof) { - nRead = -1; - } finally { - if (selector != null) { - pool.put(selector); - } - } - } else { - nRead = nioChannel.read(nioChannel.getBufHandler().getReadBuffer()); - } - return nRead; - } - } - private static class NioUpgradeServletOutputStream extends UpgradeServletOutputStream { Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java?rev=1413221&r1=1413220&r2=1413221&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java Sat Nov 24 17:51:48 2012 @@ -19,7 +19,6 @@ package org.apache.coyote.http11.upgrade import java.io.IOException; import java.util.concurrent.Executor; -import javax.servlet.ReadListener; import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; @@ -41,8 +40,8 @@ public abstract class UpgradeProcessor<S StringManager.getManager(Constants.Package); private final ProtocolHandler httpUpgradeHandler; - private final ServletInputStream upgradeServletInputStream; - private final ServletOutputStream upgradeServletOutputStream; + private final UpgradeServletInputStream upgradeServletInputStream; + private final UpgradeServletOutputStream upgradeServletOutputStream; protected UpgradeProcessor (ProtocolHandler httpUpgradeHandler, UpgradeServletInputStream upgradeServletInputStream, @@ -83,7 +82,14 @@ public abstract class UpgradeProcessor<S public final SocketState upgradeDispatch(SocketStatus status) throws IOException { - // TODO Handle read/write ready for non-blocking IO + if (status == SocketStatus.OPEN_READ) { + upgradeServletInputStream.onDataAvailable(); + } else if (status == SocketStatus.OPEN_WRITE) { + upgradeServletOutputStream.writeListener.onWritePossible(); + } else { + // Unexpected state + return SocketState.CLOSED; + } return SocketState.UPGRADED; } @@ -144,57 +150,6 @@ public abstract class UpgradeProcessor<S // ----------------------------------------------------------- Inner classes - protected abstract static class UpgradeServletInputStream extends - ServletInputStream { - - private volatile ReadListener readListener = null; - - @Override - public final boolean isFinished() { - if (readListener == null) { - throw new IllegalStateException( - sm.getString("upgrade.sis.isFinished.ise")); - } - - // TODO Support non-blocking IO - return false; - } - - @Override - public final boolean isReady() { - if (readListener == null) { - throw new IllegalStateException( - sm.getString("upgrade.sis.isReady.ise")); - } - - // TODO Support non-blocking IO - return false; - } - - @Override - public final void setReadListener(ReadListener listener) { - if (listener == null) { - throw new NullPointerException( - sm.getString("upgrade.sis.readListener.null")); - } - this.readListener = listener; - } - - @Override - public final int read() throws IOException { - return doRead(); - } - - @Override - public final int read(byte[] b, int off, int len) throws IOException { - return doRead(b, off, len); - } - - protected abstract int doRead() throws IOException; - protected abstract int doRead(byte[] b, int off, int len) - throws IOException; - } - protected abstract static class UpgradeServletOutputStream extends ServletOutputStream { Added: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java?rev=1413221&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java (added) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java Sat Nov 24 17:51:48 2012 @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.coyote.http11.upgrade; + +import java.io.IOException; + +import javax.servlet.ReadListener; +import javax.servlet.ServletInputStream; + +public abstract class UpgradeServletInputStream extends ServletInputStream { + + protected static final int EOF = -1; + protected static final int NO_DATA = -2; + + private volatile boolean finished = false; + private volatile boolean ready = true; + private volatile ReadListener listener = null; + + @Override + public final boolean isFinished() { + return finished; + } + + @Override + public boolean isReady() { + try { + ready = doIsReady(); + } catch (IOException e) { + listener.onError(e); + } + return ready; + } + + @Override + public void setReadListener(ReadListener listener) { + if (listener == null) { + // TODO i18n + throw new IllegalArgumentException(); + } + this.listener = listener; + + isReady(); + } + + @Override + public final int read() throws IOException { + if (!ready) { + // TODO i18n + throw new IllegalStateException(); + } + ReadListener readListener = this.listener; + int result = doRead(readListener == null); + if (result == EOF) { + finished = true; + if (readListener != null) { + readListener.onAllDataRead(); + } + return EOF; + } else if (result == NO_DATA) { + return EOF; + } + return result; + } + + protected void onDataAvailable() { + ready = true; + listener.onDataAvailable(); + } + + protected abstract int doRead(boolean block) throws IOException; + + protected abstract boolean doIsReady() throws IOException; +} Modified: tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java?rev=1413221&r1=1413220&r2=1413221&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java (original) +++ tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java Sat Nov 24 17:51:48 2012 @@ -26,7 +26,11 @@ import java.io.Writer; import java.net.Socket; import javax.net.SocketFactory; +import javax.servlet.ReadListener; import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -45,8 +49,34 @@ import org.apache.catalina.startup.Tomca public class TestUpgradeServletInputStream extends TomcatBaseTest { + private static final String MESSAGE = "This is a test.\n"; + @Test public void testSimpleUpgrade() throws Exception { + doUpgrade(); + } + + @Test + public void testSingleMessage() throws Exception { + UpgradeConnection conn = doUpgrade(); + Writer writer = conn.getWriter(); + BufferedReader reader = conn.getReader(); + + writer.write(MESSAGE); + writer.flush(); + + Thread.sleep(500); + + writer.write(MESSAGE); + writer.flush(); + + String response = reader.readLine(); + + Assert.assertEquals(MESSAGE, response); + } + + + private UpgradeConnection doUpgrade() throws Exception { // Setup Tomcat instance Tomcat tomcat = getTomcatInstance(); @@ -65,6 +95,8 @@ public class TestUpgradeServletInputStre Socket socket = SocketFactory.getDefault().createSocket("localhost", getPort()); + socket.setSoTimeout(300000); + InputStream is = socket.getInputStream(); OutputStream os = socket.getOutputStream(); @@ -80,6 +112,13 @@ public class TestUpgradeServletInputStre Assert.assertEquals("HTTP/1.1 101 Switching Protocols", status.substring(0, 32)); + + // Skip the remaining response headers + while (reader.readLine().length() > 0) { + // Skip + } + + return new UpgradeConnection(writer, reader); } private static class UpgradeServlet extends HttpServlet { @@ -94,11 +133,94 @@ public class TestUpgradeServletInputStre } } + private static class UpgradeConnection { + private final Writer writer; + private final BufferedReader reader; + + public UpgradeConnection(Writer writer, BufferedReader reader) { + this.writer = writer; + this.reader = reader; + } + + public Writer getWriter() { + return writer; + } + + public BufferedReader getReader() { + return reader; + } + } + private static class Echo implements ProtocolHandler { + private ServletInputStream sis; + private ServletOutputStream sos; + @Override public void init(WebConnection connection) { - // TODO + + try { + sis = connection.getInputStream(); + sos = connection.getOutputStream(); + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + + sis.setReadListener(new EchoReadListener()); + sos.setWriteListener(new EchoWriteListener()); + } + + private class EchoReadListener implements ReadListener { + + private byte[] buffer = new byte[8096]; + + @Override + public void onDataAvailable() { + try { + while (sis.isReady()) { + int read = sis.read(buffer); + if (read > 0) { + System.out.print(new String(buffer, 0, read)); + /* + if (sos.canWrite()) { + sos.write(buffer, 0, read); + } else { + throw new IOException("Unable to echo data. " + + "canWrite() returned false"); + } + */ + } + } + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + @Override + public void onAllDataRead() { + System.out.println("All data read"); + } + + @Override + public void onError(Throwable throwable) { + // TODO Auto-generated method stub + + } + } + + private class EchoWriteListener implements WriteListener { + + @Override + public void onWritePossible() { + // TODO Auto-generated method stub + + } + + @Override + public void onError(Throwable throwable) { + // TODO Auto-generated method stub + + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org