Author: markt Date: Sat Nov 24 17:35:46 2012 New Revision: 1413212 URL: http://svn.apache.org/viewvc?rev=1413212&view=rev Log: First cut HTTP upgrade for NIO/APR - Non-blocking not supported - WebSocket broken
Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java?rev=1413212&r1=1413211&r2=1413212&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeAprProcessor.java Sat Nov 24 17:35:46 2012 @@ -26,76 +26,86 @@ import org.apache.tomcat.util.net.Socket public class UpgradeAprProcessor extends UpgradeProcessor<Long> { - private final long socket; - + private static final int INFINITE_TIMEOUT = -1; public UpgradeAprProcessor(SocketWrapper<Long> wrapper, ProtocolHandler httpUpgradeProcessor) { - super(upgradeInbound); - - Socket.timeoutSet(wrapper.getSocket().longValue(), - upgradeInbound.getReadTimeout()); + super(httpUpgradeProcessor, + new AprUpgradeServletInputStream(wrapper.getSocket().longValue()), + new AprUpgradeServletOutputStream(wrapper.getSocket().longValue())); - this.socket = wrapper.getSocket().longValue(); + Socket.timeoutSet(wrapper.getSocket().longValue(), INFINITE_TIMEOUT); } - /* - * Output methods - */ - @Override - public void flush() throws IOException { - // NOOP - } + // ----------------------------------------------------------- Inner classes + private static class AprUpgradeServletInputStream + extends UpgradeServletInputStream { - @Override - public void write(int b) throws IOException { - Socket.send(socket, new byte[] {(byte) b}, 0, 1); - } + private final long socket; + public AprUpgradeServletInputStream(long socket) { + this.socket = socket; + } - @Override - public void write(byte[]b, int off, int len) throws IOException { - Socket.send(socket, b, off, len); + @Override + protected int doRead() throws IOException { + byte[] bytes = new byte[1]; + int result = Socket.recv(socket, bytes, 0, 1); + if (result == -1) { + return -1; + } else { + return bytes[0] & 0xFF; + } + } + + @Override + protected int doRead(byte[] b, int off, int len) throws IOException { + boolean block = true; + if (!block) { + Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1); + } + try { + int result = Socket.recv(socket, b, off, len); + if (result > 0) { + return result; + } else if (-result == Status.EAGAIN) { + return 0; + } else { + throw new IOException(sm.getString("apr.error", + Integer.valueOf(-result))); + } + } finally { + if (!block) { + Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0); + } + } + } } + private static class AprUpgradeServletOutputStream + extends UpgradeServletOutputStream { + + private final long socket; - /* - * Input methods - */ - @Override - public int read() throws IOException { - byte[] bytes = new byte[1]; - int result = Socket.recv(socket, bytes, 0, 1); - if (result == -1) { - return -1; - } else { - return bytes[0] & 0xFF; + public AprUpgradeServletOutputStream(long socket) { + this.socket = socket; } - } + @Override + protected void doWrite(int b) throws IOException { + Socket.send(socket, new byte[] {(byte) b}, 0, 1); + } - @Override - public int read(boolean block, byte[] bytes, int off, int len) - throws IOException { - if (!block) { - Socket.optSet(socket, Socket.APR_SO_NONBLOCK, -1); - } - try { - int result = Socket.recv(socket, bytes, off, len); - if (result > 0) { - return result; - } else if (-result == Status.EAGAIN) { - return 0; - } else { - throw new IOException(sm.getString("apr.error", - Integer.valueOf(-result))); - } - } finally { - if (!block) { - Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0); - } + @Override + protected void doWrite(byte[] b, int off, int len) throws IOException { + Socket.send(socket, b, off, len); + } + + @Override + protected void doFlush() throws IOException { + // NO-OP } } } Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java?rev=1413212&r1=1413211&r2=1413212&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeNioProcessor.java Sat Nov 24 17:35:46 2012 @@ -30,204 +30,219 @@ import org.apache.tomcat.util.net.Socket public class UpgradeNioProcessor extends UpgradeProcessor<NioChannel> { - private final NioChannel nioChannel; - private final NioSelectorPool pool; - private final int maxRead; - private final int maxWrite; + private static final int INFINITE_TIMEOUT = -1; public UpgradeNioProcessor(SocketWrapper<NioChannel> wrapper, ProtocolHandler httpUpgradeProcessor, NioSelectorPool pool) { - super(upgradeInbound); + super(httpUpgradeProcessor, + new NioUpgradeServletInputStream(wrapper, pool), + new NioUpgradeServletOutputStream(wrapper, pool)); - wrapper.setTimeout(upgradeInbound.getReadTimeout()); - - this.nioChannel = wrapper.getSocket(); - this.pool = pool; - this.maxRead = nioChannel.getBufHandler().getReadBuffer().capacity(); - this.maxWrite = nioChannel.getBufHandler().getWriteBuffer().capacity(); + wrapper.setTimeout(INFINITE_TIMEOUT); } - /* - * Output methods - */ - @Override - public void flush() throws IOException { - NioEndpoint.KeyAttachment att = - (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false); - if (att == null) { - throw new IOException("Key must be cancelled"); - } - long writeTimeout = att.getTimeout(); - Selector selector = null; - try { - selector = pool.get(); - } catch ( IOException x ) { - //ignore - } - try { - do { - if (nioChannel.flush(true, selector, writeTimeout)) { - break; - } - } while (true); - } finally { - if (selector != null) { - pool.put(selector); + // ----------------------------------------------------------- Inner classes + + private static class NioUpgradeServletInputStream + extends UpgradeServletInputStream { + + private final NioChannel nioChannel; + private final NioSelectorPool pool; + private final int maxRead; + + public NioUpgradeServletInputStream(SocketWrapper<NioChannel> wrapper, + NioSelectorPool pool) { + nioChannel = wrapper.getSocket(); + this.pool = pool; + maxRead = nioChannel.getBufHandler().getReadBuffer().capacity(); + } + + @Override + protected int doRead() throws IOException { + byte[] bytes = new byte[1]; + int result = readSocket(true, bytes, 0, 1); + if (result == -1) { + return -1; + } else { + return bytes[0] & 0xFF; } } - } - @Override - public void write(int b) throws IOException { - writeToSocket(new byte[] {(byte) b}, 0, 1); - } + @Override + protected int doRead(byte[] b, int off, int len) throws IOException { + if (len > maxRead) { + return readSocket(true, b, off, maxRead); + } else { + return readSocket(true, b, off, len); + } + } + + private int readSocket(boolean block, byte[] b, int off, int len) + throws IOException { + + ByteBuffer readBuffer = nioChannel.getBufHandler().getReadBuffer(); + int remaining = readBuffer.remaining(); + + // Is there enough data in the read buffer to satisfy this request? + if (remaining >= len) { + readBuffer.get(b, off, len); + return len; + } + + // Copy what data there is in the read buffer to the byte array + int leftToWrite = len; + int newOffset = off; + if (remaining > 0) { + readBuffer.get(b, off, remaining); + leftToWrite -= remaining; + newOffset += remaining; + } + + // Fill the read buffer as best we can + readBuffer.clear(); + int nRead = fillReadBuffer(block); + + // Full as much of the remaining byte array as possible with the data + // that was just read + if (nRead > 0) { + readBuffer.flip(); + readBuffer.limit(nRead); + if (nRead > leftToWrite) { + readBuffer.get(b, newOffset, leftToWrite); + leftToWrite = 0; + } else { + readBuffer.get(b, newOffset, nRead); + leftToWrite -= nRead; + } + } else if (nRead == 0) { + readBuffer.flip(); + readBuffer.limit(nRead); + } else if (nRead == -1) { + throw new EOFException(sm.getString("nio.eof.error")); + } - @Override - public void write(byte[]b, int off, int len) throws IOException { - int written = 0; - while (len - written > maxWrite) { - written += writeToSocket(b, off + written, maxWrite); + return len - leftToWrite; } - writeToSocket(b, off + written, len - written); - } - /* - * Input methods - */ - @Override - public int read() throws IOException { - byte[] bytes = new byte[1]; - int result = readSocket(true, bytes, 0, 1); - if (result == -1) { - return -1; - } else { - return bytes[0] & 0xFF; + private int fillReadBuffer(boolean block) throws IOException { + int nRead; + if (block) { + Selector selector = null; + try { + selector = pool.get(); + } catch ( IOException x ) { + // Ignore + } + try { + NioEndpoint.KeyAttachment att = + (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false); + if (att == null) { + throw new IOException("Key must be cancelled."); + } + nRead = pool.read(nioChannel.getBufHandler().getReadBuffer(), + nioChannel, selector, att.getTimeout()); + } catch (EOFException eof) { + nRead = -1; + } finally { + if (selector != null) { + pool.put(selector); + } + } + } else { + nRead = nioChannel.read(nioChannel.getBufHandler().getReadBuffer()); + } + return nRead; } } - @Override - public int read(boolean block, byte[] bytes, int off, int len) - throws IOException { - if (len > maxRead) { - return readSocket(block, bytes, off, maxRead); - } else { - return readSocket(block, bytes, off, len); + private static class NioUpgradeServletOutputStream + extends UpgradeServletOutputStream { + + private final NioChannel nioChannel; + private final NioSelectorPool pool; + private final int maxWrite; + + public NioUpgradeServletOutputStream( + SocketWrapper<NioChannel> wrapper, NioSelectorPool pool) { + nioChannel = wrapper.getSocket(); + this.pool = pool; + maxWrite = nioChannel.getBufHandler().getWriteBuffer().capacity(); } - } + @Override + protected void doWrite(int b) throws IOException { + writeToSocket(new byte[] {(byte) b}, 0, 1); + } - /* - * Adapted from the NioInputBuffer. - */ - private int readSocket(boolean block, byte[] bytes, int offset, int len) - throws IOException { - - ByteBuffer readBuffer = nioChannel.getBufHandler().getReadBuffer(); - int remaining = readBuffer.remaining(); - - // Is there enough data in the read buffer to satisfy this request? - if (remaining >= len) { - readBuffer.get(bytes, offset, len); - return len; - } - - // Copy what data there is in the read buffer to the byte array - int leftToWrite = len; - int newOffset = offset; - if (remaining > 0) { - readBuffer.get(bytes, offset, remaining); - leftToWrite -= remaining; - newOffset += remaining; - } - - // Fill the read buffer as best we can - readBuffer.clear(); - int nRead = fillReadBuffer(block); - - // Full as much of the remaining byte array as possible with the data - // that was just read - if (nRead > 0) { - readBuffer.flip(); - readBuffer.limit(nRead); - if (nRead > leftToWrite) { - readBuffer.get(bytes, newOffset, leftToWrite); - leftToWrite = 0; - } else { - readBuffer.get(bytes, newOffset, nRead); - leftToWrite -= nRead; + @Override + protected void doWrite(byte[] b, int off, int len) throws IOException { + int written = 0; + while (len - written > maxWrite) { + written += writeToSocket(b, off + written, maxWrite); } - } else if (nRead == 0) { - readBuffer.flip(); - readBuffer.limit(nRead); - } else if (nRead == -1) { - throw new EOFException(sm.getString("nio.eof.error")); + writeToSocket(b, off + written, len - written); } - return len - leftToWrite; - } - - private int fillReadBuffer(boolean block) throws IOException { - int nRead; - if (block) { + @Override + protected void doFlush() throws IOException { + NioEndpoint.KeyAttachment att = + (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false); + if (att == null) { + throw new IOException("Key must be cancelled"); + } + long writeTimeout = att.getTimeout(); Selector selector = null; try { selector = pool.get(); } catch ( IOException x ) { - // Ignore + //ignore } try { - NioEndpoint.KeyAttachment att = - (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false); - if (att == null) { - throw new IOException("Key must be cancelled."); - } - nRead = pool.read(nioChannel.getBufHandler().getReadBuffer(), - nioChannel, selector, att.getTimeout()); - } catch (EOFException eof) { - nRead = -1; + do { + if (nioChannel.flush(true, selector, writeTimeout)) { + break; + } + } while (true); } finally { if (selector != null) { pool.put(selector); } } - } else { - nRead = nioChannel.read(nioChannel.getBufHandler().getReadBuffer()); } - return nRead; - } - /* - * Adapted from the NioOutputBuffer - */ - private synchronized int writeToSocket(byte[] bytes, int off, int len) - throws IOException { - - nioChannel.getBufHandler().getWriteBuffer().clear(); - nioChannel.getBufHandler().getWriteBuffer().put(bytes, off, len); - nioChannel.getBufHandler().getWriteBuffer().flip(); - - int written = 0; - NioEndpoint.KeyAttachment att = - (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false); - if (att == null) { - throw new IOException("Key must be cancelled"); - } - long writeTimeout = att.getTimeout(); - Selector selector = null; - try { - selector = pool.get(); - } catch ( IOException x ) { - //ignore - } - try { - written = pool.write(nioChannel.getBufHandler().getWriteBuffer(), - nioChannel, selector, writeTimeout, true); - } finally { - if (selector != null) { - pool.put(selector); + /* + * Adapted from the NioOutputBuffer + */ + private synchronized int writeToSocket(byte[] bytes, int off, int len) + throws IOException { + + nioChannel.getBufHandler().getWriteBuffer().clear(); + nioChannel.getBufHandler().getWriteBuffer().put(bytes, off, len); + nioChannel.getBufHandler().getWriteBuffer().flip(); + + int written = 0; + NioEndpoint.KeyAttachment att = + (NioEndpoint.KeyAttachment) nioChannel.getAttachment(false); + if (att == null) { + throw new IOException("Key must be cancelled"); + } + long writeTimeout = att.getTimeout(); + Selector selector = null; + try { + selector = pool.get(); + } catch ( IOException x ) { + //ignore + } + try { + written = pool.write(nioChannel.getBufHandler().getWriteBuffer(), + nioChannel, selector, writeTimeout, true); + } finally { + if (selector != null) { + pool.put(selector); + } } + return written; } - return written; } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org