Author: markt Date: Sat Nov 24 17:26:51 2012 New Revision: 1413209 URL: http://svn.apache.org/viewvc?rev=1413209&view=rev Log: First cut of HTTP Upgrade for BIO with blocking - NIO, APR still broken - Non-blocking IO not implemented for Upgrade - WebSocket still broken
Removed: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeInbound.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeOutbound.java Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties?rev=1413209&r1=1413208&r2=1413209&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/LocalStrings.properties Sat Nov 24 17:26:51 2012 @@ -13,6 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +upgrade.sis.isFinished.ise=It is illegal to call isFinished() when the ServletInputStream is not in non-blocking mode (i.e. setReadListener() must be called first) +upgrade.sis.isReady.ise=It is illegal to call isReady() when the ServletInputStream is not in non-blocking mode (i.e. setReadListener() must be called first) +upgrade.sis.readListener.null=It is illegal to pass null to setReadListener() +upgrade.sos.canWrite.ise=It is illegal to call canWrite() when the ServletOutputStream is not in non-blocking mode (i.e. setWriteListener() must be called first) +upgrade.sos.writeListener.null=It is illegal to pass null to setWriteListener() + apr.error=Unexpected error [{0}] reading data from the APR/native socket. nio.eof.error=Unexpected EOF read on the socket Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java?rev=1413209&r1=1413208&r2=1413209&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeBioProcessor.java Sat Nov 24 17:26:51 2012 @@ -27,59 +27,63 @@ import org.apache.tomcat.util.net.Socket public class UpgradeBioProcessor extends UpgradeProcessor<Socket> { - private final InputStream inputStream; - private final OutputStream outputStream; + private static final int INFINITE_TIMEOUT = 0; public UpgradeBioProcessor(SocketWrapper<Socket> wrapper, ProtocolHandler httpUpgradeProcessor) throws IOException { - super(upgradeInbound); + super(httpUpgradeProcessor, new BioUpgradeServletInputStream(wrapper), + new BioUpgradeServletOutputStream(wrapper)); - int timeout = upgradeInbound.getReadTimeout(); - if (timeout < 0) { - timeout = 0; - } - wrapper.getSocket().setSoTimeout(timeout); - - this.inputStream = wrapper.getSocket().getInputStream(); - this.outputStream = wrapper.getSocket().getOutputStream(); + wrapper.getSocket().setSoTimeout(INFINITE_TIMEOUT); } - /* - * Output methods - */ - @Override - public void flush() throws IOException { - outputStream.flush(); - } + // ----------------------------------------------------------- Inner classes + private static class BioUpgradeServletInputStream + extends UpgradeServletInputStream { - @Override - public void write(int b) throws IOException { - outputStream.write(b); - } + private final InputStream is; + public BioUpgradeServletInputStream(SocketWrapper<Socket> wrapper) + throws IOException { + is = wrapper.getSocket().getInputStream(); + } + + @Override + protected int doRead() throws IOException { + return is.read(); + } - @Override - public void write(byte[]b, int off, int len) throws IOException { - outputStream.write(b, off, len); + @Override + protected int doRead(byte[] b, int off, int len) throws IOException { + return is.read(b, off, len); + } } + private static class BioUpgradeServletOutputStream + extends UpgradeServletOutputStream { - /* - * Input methods - */ - @Override - public int read() throws IOException { - return inputStream.read(); - } + private final OutputStream os; + + public BioUpgradeServletOutputStream(SocketWrapper<Socket> wrapper) + throws IOException { + os = wrapper.getSocket().getOutputStream(); + } + @Override + protected void doWrite(int b) throws IOException { + os.write(b); + } - @Override - public int read(boolean block, byte[] bytes, int off, int len) - throws IOException { - // The BIO endpoint always uses blocking IO so the block parameter is - // ignored and a blocking read is performed. - return inputStream.read(bytes, off, len); + @Override + protected void doWrite(byte[] b, int off, int len) throws IOException { + os.write(b, off, len); + } + + @Override + protected void doFlush() throws IOException { + os.flush(); + } } } Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java?rev=1413209&r1=1413208&r2=1413209&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java Sat Nov 24 17:26:51 2012 @@ -19,6 +19,13 @@ package org.apache.coyote.http11.upgrade import java.io.IOException; import java.util.concurrent.Executor; +import javax.servlet.ReadListener; +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; +import javax.servlet.http.ProtocolHandler; +import javax.servlet.http.WebConnection; + import org.apache.coyote.Processor; import org.apache.coyote.Request; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; @@ -27,73 +34,66 @@ import org.apache.tomcat.util.net.Socket import org.apache.tomcat.util.net.SocketWrapper; import org.apache.tomcat.util.res.StringManager; -public abstract class UpgradeProcessor<S> implements Processor<S> { +public abstract class UpgradeProcessor<S> + implements Processor<S>, WebConnection { protected static final StringManager sm = StringManager.getManager(Constants.Package); - private final UpgradeInbound upgradeInbound; + private final ProtocolHandler httpUpgradeHandler; + private final ServletInputStream upgradeServletInputStream; + private final ServletOutputStream upgradeServletOutputStream; + + protected UpgradeProcessor (ProtocolHandler httpUpgradeHandler, + UpgradeServletInputStream upgradeServletInputStream, + UpgradeServletOutputStream upgradeServletOutputStream) { + this.httpUpgradeHandler = httpUpgradeHandler; + this.upgradeServletInputStream = upgradeServletInputStream; + this.upgradeServletOutputStream = upgradeServletOutputStream; + this.httpUpgradeHandler.init(this); + } + - protected UpgradeProcessor (UpgradeInbound upgradeInbound) { - this.upgradeInbound = upgradeInbound; - upgradeInbound.setUpgradeProcessor(this); - upgradeInbound.setUpgradeOutbound(new UpgradeOutbound(this)); - } - - // Output methods - public abstract void flush() throws IOException; - public abstract void write(int b) throws IOException; - public abstract void write(byte[] b, int off, int len) throws IOException; - - // Input methods - /** - * This is always a blocking read of a single byte. - * - * @return The next byte or -1 if the end of the input is reached. - * - * @throws IOException If a problem occurs trying to read from the input - */ - public abstract int read() throws IOException; - - /** - * Read up to len bytes from the input in either blocking or non-blocking - * mode (where non-blocking is supported). If the input does not support - * non-blocking reads, a blcoking read will be performed. - * - * @param block - * @param bytes - * @param off - * @param len - * @return The number of bytes read or -1 if the end of the input is - * reached. Non-blocking reads may return zero if no data is - * available. Blocking reads never return zero. - * - * @throws IOException If a problem occurs trying to read from the input - */ - public abstract int read(boolean block, byte[] bytes, int off, int len) - throws IOException; + // --------------------------------------------------- WebConnection methods @Override - public final UpgradeInbound getUpgradeInbound() { - return upgradeInbound; + public ServletInputStream getInputStream() throws IOException { + return upgradeServletInputStream; } @Override - public final SocketState upgradeDispatch() throws IOException { - return upgradeInbound.onData(); + public ServletOutputStream getOutputStream() throws IOException { + return upgradeServletOutputStream; } + + // ------------------------------------------- Implemented Processor methods + @Override public final boolean isUpgrade() { return true; } @Override + public ProtocolHandler getHttpUpgradeHandler() { + return httpUpgradeHandler; + } + + @Override + public final SocketState upgradeDispatch() throws IOException { + + // TODO Handle read/write ready for non-blocking IO + return SocketState.UPGRADED; + } + + @Override public final void recycle(boolean socketClosing) { // Currently a NO-OP as upgrade processors are not recycled. } - // NO-OP methods for upgrade + + // ---------------------------- Processor methods that are NO-OP for upgrade + @Override public final Executor getExecutor() { return null; @@ -139,4 +139,104 @@ public abstract class UpgradeProcessor<S public final void setSslSupport(SSLSupport sslSupport) { // NOOP } + + + // ----------------------------------------------------------- Inner classes + + protected abstract static class UpgradeServletInputStream extends + ServletInputStream { + + private volatile ReadListener readListener = null; + + @Override + public final boolean isFinished() { + if (readListener == null) { + throw new IllegalStateException( + sm.getString("upgrade.sis.isFinished.ise")); + } + + // TODO Support non-blocking IO + return false; + } + + @Override + public final boolean isReady() { + if (readListener == null) { + throw new IllegalStateException( + sm.getString("upgrade.sis.isReady.ise")); + } + + // TODO Support non-blocking IO + return false; + } + + @Override + public final void setReadListener(ReadListener listener) { + if (listener == null) { + throw new NullPointerException( + sm.getString("upgrade.sis.readListener.null")); + } + this.readListener = listener; + } + + @Override + public final int read() throws IOException { + return doRead(); + } + + @Override + public final int read(byte[] b, int off, int len) throws IOException { + return doRead(b, off, len); + } + + protected abstract int doRead() throws IOException; + protected abstract int doRead(byte[] b, int off, int len) + throws IOException; + } + + protected abstract static class UpgradeServletOutputStream extends + ServletOutputStream { + + private volatile WriteListener writeListener = null; + + @Override + public boolean canWrite() { + if (writeListener == null) { + throw new IllegalStateException( + sm.getString("upgrade.sos.canWrite.ise")); + } + + // TODO Support non-blocking IO + return false; + } + + @Override + public void setWriteListener(WriteListener listener) { + if (listener == null) { + throw new NullPointerException( + sm.getString("upgrade.sos.writeListener.null")); + } + this.writeListener = listener; + } + + @Override + public void write(int b) throws IOException { + doWrite(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + doWrite(b, off, len); + } + + @Override + public void flush() throws IOException { + doFlush(); + } + + protected abstract void doWrite(int b) throws IOException; + protected abstract void doWrite(byte[] b, int off, int len) + throws IOException; + protected abstract void doFlush() throws IOException; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org