Author: markt Date: Thu Jan 15 09:21:08 2015 New Revision: 1652002 URL: http://svn.apache.org/r1652002 Log: NIO reafctoring - Use read from socketWrapper rather than HttpNio2InputBuffer - Various API tweaks to support the above
Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java?rev=1652002&r1=1652001&r2=1652002&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java Thu Jan 15 09:21:08 2015 @@ -85,7 +85,7 @@ public class Http11Nio2Processor extends @Override public SocketState asyncDispatch(SocketStatus status) { SocketState state = super.asyncDispatch(status); - if (state == SocketState.OPEN && ((InternalNio2InputBuffer) getInputBuffer()).isPending()) { + if (state == SocketState.OPEN && socketWrapper.isReadPending()) { // Following async processing, a read is still pending, so // keep the processor associated return SocketState.LONG; @@ -97,7 +97,7 @@ public class Http11Nio2Processor extends @Override protected void registerForEvent(boolean read, boolean write) { if (read) { - ((InternalNio2InputBuffer) getInputBuffer()).registerReadInterest(); + socketWrapper.registerReadInterest(); } if (write) { socketWrapper.registerWriteInterest(); Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java?rev=1652002&r1=1652001&r2=1652002&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java Thu Jan 15 09:21:08 2015 @@ -37,12 +37,9 @@ public class InternalAprInputBuffer exte private static final Log log = LogFactory.getLog(InternalAprInputBuffer.class); - // ----------------------------------------------------------- Constructors + // ----------------------------------------------------------- Constructors - /** - * Alternate constructor. - */ public InternalAprInputBuffer(Request request, int headerBufferSize) { super(request, headerBufferSize); inputStreamInputBuffer = new SocketInputBuffer(); @@ -81,7 +78,7 @@ public class InternalAprInputBuffer exte wrapper = socketWrapper; - int bufLength = Math.max(headerBufferSize, 8192); + int bufLength = Math.max(headerBufferSize * 2, 8192); if (buf == null || buf.length < bufLength) { buf = new byte[bufLength]; } @@ -116,9 +113,7 @@ public class InternalAprInputBuffer exte * This class is an input buffer which will read its data from an input * stream. */ - protected class SocketInputBuffer - implements InputBuffer { - + protected class SocketInputBuffer implements InputBuffer { /** * Read bytes into the specified chunk. @@ -136,7 +131,7 @@ public class InternalAprInputBuffer exte chunk.setBytes(buf, pos, length); pos = lastValid; - return (length); + return length; } } } Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java?rev=1652002&r1=1652001&r2=1652002&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java Thu Jan 15 09:21:08 2015 @@ -18,14 +18,6 @@ package org.apache.coyote.http11; import java.io.EOFException; import java.io.IOException; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.nio.channels.CompletionHandler; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import javax.servlet.RequestDispatcher; import org.apache.coyote.InputBuffer; import org.apache.coyote.Request; @@ -34,8 +26,6 @@ import org.apache.juli.logging.LogFactor import org.apache.tomcat.util.buf.ByteChunk; import org.apache.tomcat.util.net.AbstractEndpoint; import org.apache.tomcat.util.net.Nio2Channel; -import org.apache.tomcat.util.net.Nio2Endpoint; -import org.apache.tomcat.util.net.SocketStatus; import org.apache.tomcat.util.net.SocketWrapperBase; /** @@ -46,87 +36,39 @@ public class InternalNio2InputBuffer ext private static final Log log = LogFactory.getLog(InternalNio2InputBuffer.class); - // ----------------------------------------------------------- Constructors + // ----------------------------------------------------------- Constructors public InternalNio2InputBuffer(Request request, int headerBufferSize) { super(request, headerBufferSize); inputStreamInputBuffer = new SocketInputBuffer(); } - /** - * Underlying socket. - */ - private SocketWrapperBase<Nio2Channel> socket; - - /** - * Track write interest - */ - protected volatile boolean interest = false; - - /** - * The completion handler used for asynchronous read operations - */ - private CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>> completionHandler; - - /** - * The associated endpoint. - */ - protected AbstractEndpoint<Nio2Channel> endpoint = null; - - /** - * Read pending flag. - */ - protected volatile boolean readPending = false; + // ----------------------------------------------------- Instance Variables - /** - * Exception that occurred during writing. - */ - protected IOException e = null; + private SocketWrapperBase<Nio2Channel> wrapper; - /** - * Track if the byte buffer is flipped - */ - protected volatile boolean flipped = false; // --------------------------------------------------------- Public Methods - @Override - protected final Log getLog() { - return log; - } - - /** * Recycle the input buffer. This should be called when closing the * connection. */ @Override public void recycle() { + wrapper = null; super.recycle(); - socket = null; - readPending = false; - flipped = false; - interest = false; - e = null; } - /** - * End processing of current HTTP request. - * Note: All bytes of the current request should have been already - * consumed. This method only resets all the pointers so that we are ready - * to parse the next HTTP request. - */ + // ------------------------------------------------------ Protected Methods + @Override - public void nextRequest() { - super.nextRequest(); - interest = false; + protected final Log getLog() { + return log; } - public boolean isPending() { - return readPending; - } // ------------------------------------------------------ Protected Methods @@ -134,62 +76,17 @@ public class InternalNio2InputBuffer ext protected void init(SocketWrapperBase<Nio2Channel> socketWrapper, AbstractEndpoint<Nio2Channel> associatedEndpoint) throws IOException { - endpoint = associatedEndpoint; - socket = socketWrapper; - if (socket == null) { - // Socket has been closed in another thread - throw new IOException(sm.getString("iib.socketClosed")); - } - socketReadBufferSize = - socket.getSocket().getBufHandler().getReadBuffer().capacity(); + wrapper = socketWrapper; - int bufLength = headerBufferSize + socketReadBufferSize; + int bufLength = headerBufferSize + wrapper.getSocket().getBufHandler().getReadBuffer().capacity(); if (buf == null || buf.length < bufLength) { buf = new byte[bufLength]; } - - // Initialize the completion handler - this.completionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() { - - @Override - public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) { - boolean notify = false; - synchronized (completionHandler) { - if (nBytes.intValue() < 0) { - failed(new EOFException(sm.getString("iib.eof.error")), attachment); - } else { - readPending = false; - if ((request.getReadListener() == null || interest) && !Nio2Endpoint.isInline()) { - interest = false; - notify = true; - } - } - } - if (notify) { - endpoint.processSocket(attachment, SocketStatus.OPEN_READ, false); - } - } - - @Override - public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) { - if (exc instanceof IOException) { - e = (IOException) exc; - } else { - e = new IOException(exc); - } - attachment.setError(e); - request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, e); - readPending = false; - endpoint.processSocket(attachment, SocketStatus.OPEN_READ, true); - } - }; } @Override protected boolean fill(boolean block) throws IOException, EOFException { - if (e != null) { - throw e; - } + if (parsingHeader) { if (lastValid > headerBufferSize) { throw new IllegalArgumentException @@ -198,127 +95,24 @@ public class InternalNio2InputBuffer ext } else { lastValid = pos = end; } - // Now fill the internal buffer - int nRead = 0; - ByteBuffer byteBuffer = socket.getSocket().getBufHandler().getReadBuffer(); - if (block) { - if (!flipped) { - byteBuffer.flip(); - flipped = true; - } - int nBytes = byteBuffer.remaining(); - // This case can happen when a blocking read follows a non blocking - // fill that completed asynchronously - if (nBytes > 0) { - expand(nBytes + pos); - byteBuffer.get(buf, pos, nBytes); - lastValid = pos + nBytes; - byteBuffer.clear(); - flipped = false; - return true; - } else { - byteBuffer.clear(); - flipped = false; - try { - nRead = socket.getSocket().read(byteBuffer) - .get(socket.getTimeout(), TimeUnit.MILLISECONDS).intValue(); - } catch (ExecutionException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } else { - throw new IOException(e); - } - } catch (InterruptedException e) { - throw new IOException(e); - } catch (TimeoutException e) { - throw new SocketTimeoutException(); - } - if (nRead > 0) { - if (!flipped) { - byteBuffer.flip(); - flipped = true; - } - expand(nRead + pos); - byteBuffer.get(buf, pos, nRead); - lastValid = pos + nRead; - return true; - } else if (nRead == -1) { - //return false; - throw new EOFException(sm.getString("iib.eof.error")); - } else { - return false; - } - } - } else { - synchronized (completionHandler) { - if (!readPending) { - if (!flipped) { - byteBuffer.flip(); - flipped = true; - } - int nBytes = byteBuffer.remaining(); - if (nBytes > 0) { - expand(nBytes + pos); - byteBuffer.get(buf, pos, nBytes); - lastValid = pos + nBytes; - byteBuffer.clear(); - flipped = false; - } else { - byteBuffer.clear(); - flipped = false; - readPending = true; - Nio2Endpoint.startInline(); - socket.getSocket().read(byteBuffer, socket.getTimeout(), - TimeUnit.MILLISECONDS, socket, completionHandler); - Nio2Endpoint.endInline(); - // Return the number of bytes that have been placed into the buffer - if (!readPending) { - // If the completion handler completed immediately - if (!flipped) { - byteBuffer.flip(); - flipped = true; - } - nBytes = byteBuffer.remaining(); - if (nBytes > 0) { - expand(nBytes + pos); - byteBuffer.get(buf, pos, nBytes); - lastValid = pos + nBytes; - } - byteBuffer.clear(); - flipped = false; - } - } - return (lastValid - pos) > 0; - } else { - return false; - } - } - } - } - - public void registerReadInterest() { - synchronized (completionHandler) { - if (readPending) { - interest = true; - } else { - // If no read is pending, notify - endpoint.processSocket(socket, SocketStatus.OPEN_READ, true); - } + int nRead = wrapper.read(block, buf, pos, buf.length - pos); + if (nRead > 0) { + lastValid = pos + nRead; + return true; } + + return false; } // ------------------------------------- InputStreamInputBuffer Inner Class - /** * This class is an input buffer which will read its data from an input * stream. */ - protected class SocketInputBuffer - implements InputBuffer { - + protected class SocketInputBuffer implements InputBuffer { /** * Read bytes into the specified chunk. @@ -331,19 +125,12 @@ public class InternalNio2InputBuffer ext if (!fill(true)) //read body, must be blocking, as the thread is inside the app return -1; } - if (isBlocking()) { - int length = lastValid - pos; - chunk.setBytes(buf, pos, length); - pos = lastValid; - return (length); - } else { - synchronized (completionHandler) { - int length = lastValid - pos; - chunk.setBytes(buf, pos, length); - pos = lastValid; - return (length); - } - } + + int length = lastValid - pos; + chunk.setBytes(buf, pos, length); + pos = lastValid; + + return length; } } } Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?rev=1652002&r1=1652001&r2=1652002&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java Thu Jan 15 09:21:08 2015 @@ -80,7 +80,7 @@ public class InternalNioInputBuffer exte wrapper = socketWrapper; - int bufLength = Math.max(headerBufferSize, 8192); + int bufLength = headerBufferSize + wrapper.getSocket().getBufHandler().getReadBuffer().capacity(); if (buf == null || buf.length < bufLength) { buf = new byte[bufLength]; } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1652002&r1=1652001&r2=1652002&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Jan 15 09:21:08 2015 @@ -2629,8 +2629,20 @@ public class AprEndpoint extends Abstrac @Override + public boolean isReadPending() { + return false; + } + + + @Override + public void registerReadInterest() { + regsiterForEvent(true, false); + } + + + @Override public void registerWriteInterest() { - ((AprEndpoint) getEndpoint()).getPoller().add(getSocket().longValue(), -1, false, true); + regsiterForEvent(false, true); } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1652002&r1=1652001&r2=1652002&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan 15 09:21:08 2015 @@ -977,6 +977,9 @@ public class Nio2Endpoint extends Abstra @Override public int read(boolean block, byte[] b, int off, int len) throws IOException { + if (getError() != null) { + throw getError(); + } if (log.isDebugEnabled()) { log.debug("Socket: [" + this + "], block: [" + block + "], length: [" + len + "]"); @@ -1242,6 +1245,27 @@ public class Nio2Endpoint extends Abstra } } + + @Override + public boolean isReadPending() { + synchronized (readCompletionHandler) { + return readPending; + } + } + + + @Override + public void registerReadInterest() { + synchronized (readCompletionHandler) { + if (readPending) { + readInterest = true; + } else { + // If no read is pending, notify + getEndpoint().processSocket(this, SocketStatus.OPEN_READ, true); + } + } + } + @Override public void registerWriteInterest() { Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1652002&r1=1652001&r2=1652002&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Jan 15 09:21:08 2015 @@ -1548,6 +1548,18 @@ public class NioEndpoint extends Abstrac @Override + public boolean isReadPending() { + return false; + } + + + @Override + public void registerReadInterest() { + getPoller().add(getSocket(), SelectionKey.OP_READ); + } + + + @Override public void registerWriteInterest() { getPoller().add(getSocket(), SelectionKey.OP_WRITE); } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1652002&r1=1652001&r2=1652002&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Thu Jan 15 09:21:08 2015 @@ -178,6 +178,8 @@ public abstract class SocketWrapperBase< } public Object getWriteThreadLock() { return writeThreadLock; } + public abstract boolean isReadPending(); + protected boolean hasMoreDataToFlush() { return (writeBufferFlipped && socketWriteBuffer.remaining() > 0) || (!writeBufferFlipped && socketWriteBuffer.position() > 0); @@ -498,6 +500,8 @@ public abstract class SocketWrapperBase< holder.getBuf().put(buf,offset,length); } + public abstract void registerReadInterest(); + public abstract void registerWriteInterest(); public abstract void regsiterForEvent(boolean read, boolean write); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org