Author: markt
Date: Sat Nov 24 17:51:48 2012
New Revision: 1413221
URL: http://svn.apache.org/viewvc?rev=1413221&view=rev
Log:
Re-write reading from upgraded connection to use non-blocking IO.
NIO tested for basic operations
BIO untested
APR not written
Added:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java
Added:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java?rev=1413221&view=auto
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java
(added)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/AprUpgradeServletInputStream.java
Sat Nov 24 17:51:48 2012
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.coyote.http11.upgrade;
+
+import java.io.IOException;
+
+import org.apache.tomcat.util.net.SocketWrapper;
+
+public class AprUpgradeServletInputStream extends UpgradeServletInputStream {
+
+ private final long socket;
+
+ public AprUpgradeServletInputStream(SocketWrapper<Long> wrapper) {
+ this.socket = wrapper.getSocket().longValue();
+ }
+/*
+ @Override
+ protected int doRead() throws IOException {
+ byte[] bytes = new byte[1];
+ int result = Socket.recv(socket, bytes, 0, 1);
+ if (result == -1) {
+ return -1;
+ } else {
+ return bytes[0] & 0xFF;
+ }
+ }
+
+ @Override
+ protected int doRead(byte[] b, int off, int len) throws IOException {
+ boolean block = true;
+ if (!block) {
+ Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1);
+ }
+ try {
+ int result = Socket.recv(socket, b, off, len);
+ if (result > 0) {
+ return result;
+ } else if (-result == Status.EAGAIN) {
+ return 0;
+ } else {
+ throw new IOException(sm.getString("apr.error",
+ Integer.valueOf(-result)));
+ }
+ } finally {
+ if (!block) {
+ Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
+ }
+ }
+ }
+}
+*/
+
+ @Override
+ protected int doRead(boolean block) throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ protected boolean doIsReady() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+}
Added:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java?rev=1413221&view=auto
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java
(added)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/BioUpgradeServletInputStream.java
Sat Nov 24 17:51:48 2012
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.coyote.http11.upgrade;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+
+import org.apache.tomcat.util.net.SocketWrapper;
+
+public class BioUpgradeServletInputStream extends UpgradeServletInputStream {
+
+ private final InputStream inputStream;
+
+ public BioUpgradeServletInputStream(SocketWrapper<Socket> wrapper)
+ throws IOException {
+ inputStream = wrapper.getSocket().getInputStream();
+ }
+
+ @Override
+ protected int doRead(boolean block) throws IOException {
+ return inputStream.read();
+ }
+
+ @Override
+ protected boolean doIsReady() {
+ // Always returns true for BIO
+ return true;
+ }
+}
Added:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java?rev=1413221&view=auto
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java
(added)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/NioUpgradeServletInputStream.java
Sat Nov 24 17:51:48 2012
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.coyote.http11.upgrade;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Selector;
+
+import org.apache.tomcat.util.net.NioChannel;
+import org.apache.tomcat.util.net.NioEndpoint;
+import org.apache.tomcat.util.net.NioSelectorPool;
+import org.apache.tomcat.util.net.SocketWrapper;
+
+public class NioUpgradeServletInputStream extends UpgradeServletInputStream {
+
+ private final NioChannel channel;
+ private final NioSelectorPool pool;
+
+ public NioUpgradeServletInputStream(SocketWrapper<NioChannel> wrapper,
+ NioSelectorPool pool) {
+ this.channel = wrapper.getSocket();
+ this.pool = pool;
+ }
+
+ @Override
+ protected int doRead(boolean block) throws IOException {
+ byte[] bytes = new byte[1];
+ int result = readSocket(block, bytes, 0, 1);
+ if (result == 0) {
+ return NO_DATA;
+ } else if (result == -1) {
+ return EOF;
+ } else {
+ return bytes[0] & 0xFF;
+ }
+ }
+
+ @Override
+ protected boolean doIsReady() throws IOException {
+ ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer();
+
+ if (readBuffer.remaining() > 0) {
+ return true;
+ }
+
+ readBuffer.clear();
+ fillReadBuffer(false);
+
+ boolean isReady = readBuffer.position() > 0;
+ readBuffer.flip();
+ return isReady;
+ }
+
+ private int readSocket(boolean block, byte[] b, int off, int len)
+ throws IOException {
+
+ ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer();
+ int remaining = readBuffer.remaining();
+
+ // Is there enough data in the read buffer to satisfy this request?
+ if (remaining >= len) {
+ readBuffer.get(b, off, len);
+ return len;
+ }
+
+ // Copy what data there is in the read buffer to the byte array
+ int leftToWrite = len;
+ int newOffset = off;
+ if (remaining > 0) {
+ readBuffer.get(b, off, remaining);
+ leftToWrite -= remaining;
+ newOffset += remaining;
+ }
+
+ // Fill the read buffer as best we can
+ readBuffer.clear();
+ int nRead = fillReadBuffer(block);
+
+ // Full as much of the remaining byte array as possible with the data
+ // that was just read
+ if (nRead > 0) {
+ readBuffer.flip();
+ readBuffer.limit(nRead);
+ if (nRead > leftToWrite) {
+ readBuffer.get(b, newOffset, leftToWrite);
+ leftToWrite = 0;
+ } else {
+ readBuffer.get(b, newOffset, nRead);
+ leftToWrite -= nRead;
+ }
+ } else if (nRead == 0) {
+ readBuffer.flip();
+ } else if (nRead == -1) {
+ // TODO i18n
+ throw new EOFException();
+ }
+
+ return len - leftToWrite;
+ }
+
+ private int fillReadBuffer(boolean block) throws IOException {
+ int nRead;
+ if (block) {
+ Selector selector = null;
+ try {
+ selector = pool.get();
+ } catch ( IOException x ) {
+ // Ignore
+ }
+ try {
+ NioEndpoint.KeyAttachment att =
+ (NioEndpoint.KeyAttachment)
channel.getAttachment(false);
+ if (att == null) {
+ throw new IOException("Key must be cancelled.");
+ }
+ nRead = pool.read(channel.getBufHandler().getReadBuffer(),
+ channel, selector, att.getTimeout());
+ } catch (EOFException eof) {
+ nRead = -1;
+ } finally {
+ if (selector != null) {
+ pool.put(selector);
+ }
+ }
+ } else {
+ nRead = channel.read(channel.getBufHandler().getReadBuffer());
+ }
+ return nRead;
+ }
+}
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java?rev=1413221&r1=1413220&r2=1413221&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java
Sat Nov 24 17:51:48 2012
@@ -21,7 +21,6 @@ import java.io.IOException;
import javax.servlet.http.ProtocolHandler;
import org.apache.tomcat.jni.Socket;
-import org.apache.tomcat.jni.Status;
import org.apache.tomcat.util.net.SocketWrapper;
public class UpgradeAprProcessor extends UpgradeProcessor<Long> {
@@ -31,7 +30,7 @@ public class UpgradeAprProcessor extends
public UpgradeAprProcessor(SocketWrapper<Long> wrapper,
ProtocolHandler httpUpgradeProcessor) {
super(httpUpgradeProcessor,
- new
AprUpgradeServletInputStream(wrapper.getSocket().longValue()),
+ new AprUpgradeServletInputStream(wrapper),
new
AprUpgradeServletOutputStream(wrapper.getSocket().longValue()));
Socket.timeoutSet(wrapper.getSocket().longValue(), INFINITE_TIMEOUT);
@@ -40,50 +39,6 @@ public class UpgradeAprProcessor extends
// ----------------------------------------------------------- Inner
classes
- private static class AprUpgradeServletInputStream
- extends UpgradeServletInputStream {
-
- private final long socket;
-
- public AprUpgradeServletInputStream(long socket) {
- this.socket = socket;
- }
-
- @Override
- protected int doRead() throws IOException {
- byte[] bytes = new byte[1];
- int result = Socket.recv(socket, bytes, 0, 1);
- if (result == -1) {
- return -1;
- } else {
- return bytes[0] & 0xFF;
- }
- }
-
- @Override
- protected int doRead(byte[] b, int off, int len) throws IOException {
- boolean block = true;
- if (!block) {
- Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1);
- }
- try {
- int result = Socket.recv(socket, b, off, len);
- if (result > 0) {
- return result;
- } else if (-result == Status.EAGAIN) {
- return 0;
- } else {
- throw new IOException(sm.getString("apr.error",
- Integer.valueOf(-result)));
- }
- } finally {
- if (!block) {
- Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
- }
- }
- }
- }
-
private static class AprUpgradeServletOutputStream
extends UpgradeServletOutputStream {
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java?rev=1413221&r1=1413220&r2=1413221&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java
Sat Nov 24 17:51:48 2012
@@ -17,7 +17,6 @@
package org.apache.coyote.http11.upgrade;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
@@ -40,27 +39,6 @@ public class UpgradeBioProcessor extends
// ----------------------------------------------------------- Inner
classes
- private static class BioUpgradeServletInputStream
- extends UpgradeServletInputStream {
-
- private final InputStream is;
-
- public BioUpgradeServletInputStream(SocketWrapper<Socket> wrapper)
- throws IOException {
- is = wrapper.getSocket().getInputStream();
- }
-
- @Override
- protected int doRead() throws IOException {
- return is.read();
- }
-
- @Override
- protected int doRead(byte[] b, int off, int len) throws IOException {
- return is.read(b, off, len);
- }
- }
-
private static class BioUpgradeServletOutputStream
extends UpgradeServletOutputStream {
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java?rev=1413221&r1=1413220&r2=1413221&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java
Sat Nov 24 17:51:48 2012
@@ -16,9 +16,7 @@
*/
package org.apache.coyote.http11.upgrade;
-import java.io.EOFException;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import javax.servlet.http.ProtocolHandler;
@@ -44,118 +42,6 @@ public class UpgradeNioProcessor extends
// ----------------------------------------------------------- Inner
classes
- private static class NioUpgradeServletInputStream
- extends UpgradeServletInputStream {
-
- private final NioChannel nioChannel;
- private final NioSelectorPool pool;
- private final int maxRead;
-
- public NioUpgradeServletInputStream(SocketWrapper<NioChannel> wrapper,
- NioSelectorPool pool) {
- nioChannel = wrapper.getSocket();
- this.pool = pool;
- maxRead = nioChannel.getBufHandler().getReadBuffer().capacity();
- }
-
- @Override
- protected int doRead() throws IOException {
- byte[] bytes = new byte[1];
- int result = readSocket(true, bytes, 0, 1);
- if (result == -1) {
- return -1;
- } else {
- return bytes[0] & 0xFF;
- }
- }
-
- @Override
- protected int doRead(byte[] b, int off, int len) throws IOException {
- if (len > maxRead) {
- return readSocket(true, b, off, maxRead);
- } else {
- return readSocket(true, b, off, len);
- }
- }
-
- private int readSocket(boolean block, byte[] b, int off, int len)
- throws IOException {
-
- ByteBuffer readBuffer = nioChannel.getBufHandler().getReadBuffer();
- int remaining = readBuffer.remaining();
-
- // Is there enough data in the read buffer to satisfy this request?
- if (remaining >= len) {
- readBuffer.get(b, off, len);
- return len;
- }
-
- // Copy what data there is in the read buffer to the byte array
- int leftToWrite = len;
- int newOffset = off;
- if (remaining > 0) {
- readBuffer.get(b, off, remaining);
- leftToWrite -= remaining;
- newOffset += remaining;
- }
-
- // Fill the read buffer as best we can
- readBuffer.clear();
- int nRead = fillReadBuffer(block);
-
- // Full as much of the remaining byte array as possible with the
data
- // that was just read
- if (nRead > 0) {
- readBuffer.flip();
- readBuffer.limit(nRead);
- if (nRead > leftToWrite) {
- readBuffer.get(b, newOffset, leftToWrite);
- leftToWrite = 0;
- } else {
- readBuffer.get(b, newOffset, nRead);
- leftToWrite -= nRead;
- }
- } else if (nRead == 0) {
- readBuffer.flip();
- readBuffer.limit(nRead);
- } else if (nRead == -1) {
- throw new EOFException(sm.getString("nio.eof.error"));
- }
-
- return len - leftToWrite;
- }
-
- private int fillReadBuffer(boolean block) throws IOException {
- int nRead;
- if (block) {
- Selector selector = null;
- try {
- selector = pool.get();
- } catch ( IOException x ) {
- // Ignore
- }
- try {
- NioEndpoint.KeyAttachment att =
- (NioEndpoint.KeyAttachment)
nioChannel.getAttachment(false);
- if (att == null) {
- throw new IOException("Key must be cancelled.");
- }
- nRead =
pool.read(nioChannel.getBufHandler().getReadBuffer(),
- nioChannel, selector, att.getTimeout());
- } catch (EOFException eof) {
- nRead = -1;
- } finally {
- if (selector != null) {
- pool.put(selector);
- }
- }
- } else {
- nRead =
nioChannel.read(nioChannel.getBufHandler().getReadBuffer());
- }
- return nRead;
- }
- }
-
private static class NioUpgradeServletOutputStream
extends UpgradeServletOutputStream {
Modified:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java?rev=1413221&r1=1413220&r2=1413221&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java
Sat Nov 24 17:51:48 2012
@@ -19,7 +19,6 @@ package org.apache.coyote.http11.upgrade
import java.io.IOException;
import java.util.concurrent.Executor;
-import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
@@ -41,8 +40,8 @@ public abstract class UpgradeProcessor<S
StringManager.getManager(Constants.Package);
private final ProtocolHandler httpUpgradeHandler;
- private final ServletInputStream upgradeServletInputStream;
- private final ServletOutputStream upgradeServletOutputStream;
+ private final UpgradeServletInputStream upgradeServletInputStream;
+ private final UpgradeServletOutputStream upgradeServletOutputStream;
protected UpgradeProcessor (ProtocolHandler httpUpgradeHandler,
UpgradeServletInputStream upgradeServletInputStream,
@@ -83,7 +82,14 @@ public abstract class UpgradeProcessor<S
public final SocketState upgradeDispatch(SocketStatus status)
throws IOException {
- // TODO Handle read/write ready for non-blocking IO
+ if (status == SocketStatus.OPEN_READ) {
+ upgradeServletInputStream.onDataAvailable();
+ } else if (status == SocketStatus.OPEN_WRITE) {
+ upgradeServletOutputStream.writeListener.onWritePossible();
+ } else {
+ // Unexpected state
+ return SocketState.CLOSED;
+ }
return SocketState.UPGRADED;
}
@@ -144,57 +150,6 @@ public abstract class UpgradeProcessor<S
// ----------------------------------------------------------- Inner
classes
- protected abstract static class UpgradeServletInputStream extends
- ServletInputStream {
-
- private volatile ReadListener readListener = null;
-
- @Override
- public final boolean isFinished() {
- if (readListener == null) {
- throw new IllegalStateException(
- sm.getString("upgrade.sis.isFinished.ise"));
- }
-
- // TODO Support non-blocking IO
- return false;
- }
-
- @Override
- public final boolean isReady() {
- if (readListener == null) {
- throw new IllegalStateException(
- sm.getString("upgrade.sis.isReady.ise"));
- }
-
- // TODO Support non-blocking IO
- return false;
- }
-
- @Override
- public final void setReadListener(ReadListener listener) {
- if (listener == null) {
- throw new NullPointerException(
- sm.getString("upgrade.sis.readListener.null"));
- }
- this.readListener = listener;
- }
-
- @Override
- public final int read() throws IOException {
- return doRead();
- }
-
- @Override
- public final int read(byte[] b, int off, int len) throws IOException {
- return doRead(b, off, len);
- }
-
- protected abstract int doRead() throws IOException;
- protected abstract int doRead(byte[] b, int off, int len)
- throws IOException;
- }
-
protected abstract static class UpgradeServletOutputStream extends
ServletOutputStream {
Added:
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java?rev=1413221&view=auto
==============================================================================
---
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
(added)
+++
tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletInputStream.java
Sat Nov 24 17:51:48 2012
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.coyote.http11.upgrade;
+
+import java.io.IOException;
+
+import javax.servlet.ReadListener;
+import javax.servlet.ServletInputStream;
+
+public abstract class UpgradeServletInputStream extends ServletInputStream {
+
+ protected static final int EOF = -1;
+ protected static final int NO_DATA = -2;
+
+ private volatile boolean finished = false;
+ private volatile boolean ready = true;
+ private volatile ReadListener listener = null;
+
+ @Override
+ public final boolean isFinished() {
+ return finished;
+ }
+
+ @Override
+ public boolean isReady() {
+ try {
+ ready = doIsReady();
+ } catch (IOException e) {
+ listener.onError(e);
+ }
+ return ready;
+ }
+
+ @Override
+ public void setReadListener(ReadListener listener) {
+ if (listener == null) {
+ // TODO i18n
+ throw new IllegalArgumentException();
+ }
+ this.listener = listener;
+
+ isReady();
+ }
+
+ @Override
+ public final int read() throws IOException {
+ if (!ready) {
+ // TODO i18n
+ throw new IllegalStateException();
+ }
+ ReadListener readListener = this.listener;
+ int result = doRead(readListener == null);
+ if (result == EOF) {
+ finished = true;
+ if (readListener != null) {
+ readListener.onAllDataRead();
+ }
+ return EOF;
+ } else if (result == NO_DATA) {
+ return EOF;
+ }
+ return result;
+ }
+
+ protected void onDataAvailable() {
+ ready = true;
+ listener.onDataAvailable();
+ }
+
+ protected abstract int doRead(boolean block) throws IOException;
+
+ protected abstract boolean doIsReady() throws IOException;
+}
Modified:
tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java?rev=1413221&r1=1413220&r2=1413221&view=diff
==============================================================================
---
tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java
(original)
+++
tomcat/trunk/test/org/apache/coyote/http11/upgrade/TestUpgradeServletInputStream.java
Sat Nov 24 17:51:48 2012
@@ -26,7 +26,11 @@ import java.io.Writer;
import java.net.Socket;
import javax.net.SocketFactory;
+import javax.servlet.ReadListener;
import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -45,8 +49,34 @@ import org.apache.catalina.startup.Tomca
public class TestUpgradeServletInputStream extends TomcatBaseTest {
+ private static final String MESSAGE = "This is a test.\n";
+
@Test
public void testSimpleUpgrade() throws Exception {
+ doUpgrade();
+ }
+
+ @Test
+ public void testSingleMessage() throws Exception {
+ UpgradeConnection conn = doUpgrade();
+ Writer writer = conn.getWriter();
+ BufferedReader reader = conn.getReader();
+
+ writer.write(MESSAGE);
+ writer.flush();
+
+ Thread.sleep(500);
+
+ writer.write(MESSAGE);
+ writer.flush();
+
+ String response = reader.readLine();
+
+ Assert.assertEquals(MESSAGE, response);
+ }
+
+
+ private UpgradeConnection doUpgrade() throws Exception {
// Setup Tomcat instance
Tomcat tomcat = getTomcatInstance();
@@ -65,6 +95,8 @@ public class TestUpgradeServletInputStre
Socket socket =
SocketFactory.getDefault().createSocket("localhost",
getPort());
+ socket.setSoTimeout(300000);
+
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
@@ -80,6 +112,13 @@ public class TestUpgradeServletInputStre
Assert.assertEquals("HTTP/1.1 101 Switching Protocols",
status.substring(0, 32));
+
+ // Skip the remaining response headers
+ while (reader.readLine().length() > 0) {
+ // Skip
+ }
+
+ return new UpgradeConnection(writer, reader);
}
private static class UpgradeServlet extends HttpServlet {
@@ -94,11 +133,94 @@ public class TestUpgradeServletInputStre
}
}
+ private static class UpgradeConnection {
+ private final Writer writer;
+ private final BufferedReader reader;
+
+ public UpgradeConnection(Writer writer, BufferedReader reader) {
+ this.writer = writer;
+ this.reader = reader;
+ }
+
+ public Writer getWriter() {
+ return writer;
+ }
+
+ public BufferedReader getReader() {
+ return reader;
+ }
+ }
+
private static class Echo implements ProtocolHandler {
+ private ServletInputStream sis;
+ private ServletOutputStream sos;
+
@Override
public void init(WebConnection connection) {
- // TODO
+
+ try {
+ sis = connection.getInputStream();
+ sos = connection.getOutputStream();
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
+
+ sis.setReadListener(new EchoReadListener());
+ sos.setWriteListener(new EchoWriteListener());
+ }
+
+ private class EchoReadListener implements ReadListener {
+
+ private byte[] buffer = new byte[8096];
+
+ @Override
+ public void onDataAvailable() {
+ try {
+ while (sis.isReady()) {
+ int read = sis.read(buffer);
+ if (read > 0) {
+ System.out.print(new String(buffer, 0, read));
+ /*
+ if (sos.canWrite()) {
+ sos.write(buffer, 0, read);
+ } else {
+ throw new IOException("Unable to echo data. " +
+ "canWrite() returned false");
+ }
+ */
+ }
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Override
+ public void onAllDataRead() {
+ System.out.println("All data read");
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ // TODO Auto-generated method stub
+
+ }
+ }
+
+ private class EchoWriteListener implements WriteListener {
+
+ @Override
+ public void onWritePossible() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ // TODO Auto-generated method stub
+
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]