Author: fhanik Date: Wed Oct 18 16:24:52 2006 New Revision: 465417 URL: http://svn.apache.org/viewvc?view=rev&rev=465417 Log: Implement non blocking read on HTTP requests.
A common scalability problem when it comes to HTTP is the fact that there are slow clients, that will block a server resources while sending a HTTP request. Especially when you have larger request headers. On FreeBSD the kernel has a built in http filter to not wake up the application socket handle until the entire request has been received, however on other platforms this is not available. With the Tomcat connectors, there is an obvious problem when it comes to slow clients, if the client sends up a partial request, Tomcat will block the thread until the client has finished sending the request. For example, if the client has 10 headers it sends up the first 5 headers, then the next 5 in a sequential batch, the tomcat thread is locked in a blocking read I've tried to fix that problem by making the NIO connector be non blocking. The only time the NIO connector will block now is when the servlet asks for data, usually the request body, as we don't have a way to suspend a thread, like continuations. Once we have continuations(that can truly remember thread stack data), we can have a truly non blocking server, but we are not there yet. I believe this code could be easily ported to APR connector with very little effort. When you review this code, please note that I have not attemtped to rewrite the header parse logic, I might do that in a later stage as this got a little messy, but I wanted the proof of concept done first and reuse as much code as possible. Please feel free to review and even flame me if needed, at least that means this got some attention :) Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?view=diff&rev=465417&r1=465416&r2=465417 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java Wed Oct 18 16:24:52 2006 @@ -820,7 +820,7 @@ boolean keptAlive = false; boolean openSocket = false; - + boolean recycle = true; while (!error && keepAlive && !comet) { // Parsing the request header @@ -829,8 +829,7 @@ socket.getIOChannel().socket().setSoTimeout((int)soTimeout); inputBuffer.readTimeout = soTimeout; } - if (!inputBuffer.parseRequestLine - (keptAlive && (endpoint.getCurrentThreadsBusy() > limit))) { + if (!inputBuffer.parseRequestLine(keptAlive && (endpoint.getCurrentThreadsBusy() > limit))) { // This means that no data is available right now // (long keepalive), so that the processor should be recycled // and the method should return true @@ -839,13 +838,18 @@ socket.getPoller().add(socket); break; } - request.setStartTime(System.currentTimeMillis()); keptAlive = true; - if (!disableUploadTimeout) { + if ( !inputBuffer.parseHeaders() ) { + openSocket = true; + socket.getPoller().add(socket); + recycle = false; + break; + } + request.setStartTime(System.currentTimeMillis()); + if (!disableUploadTimeout) { //only for body, not for request headers socket.getIOChannel().socket().setSoTimeout((int)timeout); inputBuffer.readTimeout = soTimeout; } - inputBuffer.parseHeaders(); } catch (IOException e) { error = true; break; @@ -934,7 +938,7 @@ return SocketState.LONG; } } else { - recycle(); + if ( recycle ) recycle(); return (openSocket) ? SocketState.OPEN : SocketState.CLOSED; } Modified: tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?view=diff&rev=465417&r1=465416&r2=465417 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java Wed Oct 18 16:24:52 2006 @@ -45,9 +45,10 @@ // -------------------------------------------------------------- Constants - + enum HeaderParseStatus {DONE, HAVE_MORE_HEADERS, NEED_MORE_DATA} + enum HeaderParsePosition {HEADER_START, HEADER_NAME, HEADER_VALUE, HEADER_MULTI_LINE} // ----------------------------------------------------------- Constructors - + /** * Alternate constructor. @@ -72,6 +73,9 @@ lastActiveFilter = -1; parsingHeader = true; + parsingRequestLine = true; + headerParsePos = HeaderParsePosition.HEADER_START; + headerData.recycle(); swallowInput = true; if (readTimeout < 0) { @@ -112,6 +116,8 @@ * State. */ protected boolean parsingHeader; + protected boolean parsingRequestLine; + protected HeaderParsePosition headerParsePos; /** @@ -286,6 +292,9 @@ pos = 0; lastActiveFilter = -1; parsingHeader = true; + headerParsePos = HeaderParsePosition.HEADER_START; + parsingRequestLine = true; + headerData.recycle(); swallowInput = true; } @@ -325,6 +334,9 @@ pos = 0; lastActiveFilter = -1; parsingHeader = true; + headerParsePos = HeaderParsePosition.HEADER_START; + parsingRequestLine = true; + headerData.recycle(); swallowInput = true; } @@ -360,6 +372,9 @@ public boolean parseRequestLine(boolean useAvailableData) throws IOException { + //check state + if ( !parsingRequestLine ) return true; + int start = 0; // @@ -375,7 +390,7 @@ return false; } if (readTimeout == -1) { - if (!fill()) //request line parsing + if (!fill(false,true)) //request line parsing throw new EOFException(sm.getString("iib.eof.error")); } else { // Do a simple read with a short timeout @@ -397,8 +412,8 @@ return false; } if (readTimeout == -1) { - if (!fill()) //request line parsing - throw new EOFException(sm.getString("iib.eof.error")); + if (!fill(false,false)) //request line parsing + return false; } else { // Do a simple read with a short timeout if ( !readSocket(true, false) ) return false; @@ -416,8 +431,8 @@ // Read new bytes if needed if (pos >= lastValid) { - if (!fill()) //request line parsing - throw new EOFException(sm.getString("iib.eof.error")); + if (!fill(true,false)) //request line parsing + return false; } if (buf[pos] == Constants.SP) { @@ -445,8 +460,8 @@ // Read new bytes if needed if (pos >= lastValid) { - if (!fill()) //request line parsing - throw new EOFException(sm.getString("iib.eof.error")); + if (!fill(true,false)) //request line parsing + return false; } if (buf[pos] == Constants.SP) { @@ -489,8 +504,8 @@ // Read new bytes if needed if (pos >= lastValid) { - if (!fill()) //reques line parsing - throw new EOFException(sm.getString("iib.eof.error")); + if (!fill(true,false)) //reques line parsing + return false; } if (buf[pos] == Constants.CR) { @@ -510,7 +525,7 @@ } else { request.protocol().setString(""); } - + parsingRequestLine = false; return true; } @@ -552,7 +567,7 @@ } else if ( !block ) { return false; } else { - timedOut = (readTimeout != -1) && ((System.currentTimeMillis()-start)>readTimeout); + timedOut = timeout && (readTimeout != -1) && ((System.currentTimeMillis()-start)>readTimeout); if ( !timedOut && nRead == 0 ) { try { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); @@ -604,15 +619,20 @@ /** * Parse the HTTP headers. */ - public void parseHeaders() + public boolean parseHeaders() throws IOException { - - while (parseHeader()) { + HeaderParseStatus status = HeaderParseStatus.HAVE_MORE_HEADERS; + + do { + status = parseHeader(); + } while ( status == HeaderParseStatus.HAVE_MORE_HEADERS ); + if (status == HeaderParseStatus.DONE) { + parsingHeader = false; + end = pos; + return true; + } else { + return false; } - - parsingHeader = false; - end = pos; - } @@ -622,7 +642,7 @@ * @return false after reading a blank line (which indicates that the * HTTP header parsing is done */ - public boolean parseHeader() + public HeaderParseStatus parseHeader() throws IOException { // @@ -630,12 +650,14 @@ // byte chr = 0; - while (true) { + while (headerParsePos == HeaderParsePosition.HEADER_START) { // Read new bytes if needed if (pos >= lastValid) { - if (!fill()) //parse header - throw new EOFException(sm.getString("iib.eof.error")); + if (!fill(true,false)) {//parse header + headerParsePos = HeaderParsePosition.HEADER_START; + return HeaderParseStatus.NEED_MORE_DATA; + } } chr = buf[pos]; @@ -643,7 +665,7 @@ if ((chr == Constants.CR) || (chr == Constants.LF)) { if (chr == Constants.LF) { pos++; - return false; + return HeaderParseStatus.DONE; } } else { break; @@ -653,28 +675,31 @@ } - // Mark the current buffer position - int start = pos; + if ( headerParsePos == HeaderParsePosition.HEADER_START ) { + // Mark the current buffer position + headerData.start = pos; + headerParsePos = HeaderParsePosition.HEADER_NAME; + } // // Reading the header name // Header name is always US-ASCII // + + - boolean colon = false; - MessageBytes headerValue = null; - - while (!colon) { + while (headerParsePos == HeaderParsePosition.HEADER_NAME) { // Read new bytes if needed if (pos >= lastValid) { - if (!fill()) //parse header - throw new EOFException(sm.getString("iib.eof.error")); + if (!fill(true,false)) { //parse header + return HeaderParseStatus.NEED_MORE_DATA; + } } if (buf[pos] == Constants.COLON) { - colon = true; - headerValue = headers.addValue(buf, start, pos - start); + headerParsePos = HeaderParsePosition.HEADER_VALUE; + headerData.headerValue = headers.addValue(buf, headerData.start, pos - headerData.start); } chr = buf[pos]; if ((chr >= Constants.A) && (chr <= Constants.Z)) { @@ -682,97 +707,121 @@ } pos++; - + if ( headerParsePos == HeaderParsePosition.HEADER_VALUE ) { + // Mark the current buffer position + headerData.start = pos; + headerData.realPos = pos; + } } - // Mark the current buffer position - start = pos; - int realPos = pos; - + // // Reading the header value (which can be spanned over multiple lines) // boolean eol = false; - boolean validLine = true; - while (validLine) { + while (headerParsePos == HeaderParsePosition.HEADER_VALUE || + headerParsePos == HeaderParsePosition.HEADER_MULTI_LINE) { + if ( headerParsePos == HeaderParsePosition.HEADER_VALUE ) { + + boolean space = true; - boolean space = true; + // Skipping spaces + while (space) { - // Skipping spaces - while (space) { + // Read new bytes if needed + if (pos >= lastValid) { + if (!fill(true,false)) {//parse header + //HEADER_VALUE, should already be set + return HeaderParseStatus.NEED_MORE_DATA; + } + } - // Read new bytes if needed - if (pos >= lastValid) { - if (!fill()) //parse header - throw new EOFException(sm.getString("iib.eof.error")); - } + if ((buf[pos] == Constants.SP) || (buf[pos] == Constants.HT)) { + pos++; + } else { + space = false; + } - if ((buf[pos] == Constants.SP) || (buf[pos] == Constants.HT)) { - pos++; - } else { - space = false; } - } - - int lastSignificantChar = realPos; + headerData.lastSignificantChar = headerData.realPos; - // Reading bytes until the end of the line - while (!eol) { + // Reading bytes until the end of the line + while (!eol) { - // Read new bytes if needed - if (pos >= lastValid) { - if (!fill()) //parse header - throw new EOFException(sm.getString("iib.eof.error")); - } + // Read new bytes if needed + if (pos >= lastValid) { + if (!fill(true,false)) {//parse header + //HEADER_VALUE + return HeaderParseStatus.NEED_MORE_DATA; + } - if (buf[pos] == Constants.CR) { - } else if (buf[pos] == Constants.LF) { - eol = true; - } else if (buf[pos] == Constants.SP) { - buf[realPos] = buf[pos]; - realPos++; - } else { - buf[realPos] = buf[pos]; - realPos++; - lastSignificantChar = realPos; - } + } - pos++; + if (buf[pos] == Constants.CR) { + } else if (buf[pos] == Constants.LF) { + eol = true; + } else if (buf[pos] == Constants.SP) { + buf[headerData.realPos] = buf[pos]; + headerData.realPos++; + } else { + buf[headerData.realPos] = buf[pos]; + headerData.realPos++; + headerData.lastSignificantChar = headerData.realPos; + } - } + pos++; - realPos = lastSignificantChar; + } - // Checking the first character of the new line. If the character - // is a LWS, then it's a multiline header + headerData.realPos = headerData.lastSignificantChar; + // Checking the first character of the new line. If the character + // is a LWS, then it's a multiline header + headerParsePos = HeaderParsePosition.HEADER_MULTI_LINE; + } // Read new bytes if needed if (pos >= lastValid) { - if (!fill()) //parse header - throw new EOFException(sm.getString("iib.eof.error")); + if (!fill(true,false)) {//parse header + + //HEADER_MULTI_LINE + return HeaderParseStatus.NEED_MORE_DATA; + } } chr = buf[pos]; - if ((chr != Constants.SP) && (chr != Constants.HT)) { - validLine = false; - } else { - eol = false; - // Copying one extra space in the buffer (since there must - // be at least one space inserted between the lines) - buf[realPos] = chr; - realPos++; + if ( headerParsePos == HeaderParsePosition.HEADER_MULTI_LINE ) { + if ( (chr != Constants.SP) && (chr != Constants.HT)) { + headerParsePos = HeaderParsePosition.HEADER_START; + } else { + eol = false; + // Copying one extra space in the buffer (since there must + // be at least one space inserted between the lines) + buf[headerData.realPos] = chr; + headerData.realPos++; + } } - } - // Set the header value - headerValue.setBytes(buf, start, realPos - start); - - return true; - + headerData.headerValue.setBytes(buf, headerData.start, headerData.realPos - headerData.start); + headerData.recycle(); + return HeaderParseStatus.HAVE_MORE_HEADERS; + } + + protected HeaderParseData headerData = new HeaderParseData(); + public static class HeaderParseData { + int start = 0; + int realPos = 0; + int lastSignificantChar = 0; + MessageBytes headerValue = null; + public void recycle() { + start = 0; + realPos = 0; + lastSignificantChar = 0; + headerValue = null; + } } @@ -795,14 +844,13 @@ // ------------------------------------------------------ Protected Methods - /** * Fill the internal buffer using data from the undelying input stream. * * @return false if at end of stream */ - protected boolean fill() - throws IOException { + protected boolean fill(boolean timeout, boolean block) + throws IOException, EOFException { boolean read = false; @@ -814,7 +862,7 @@ } // Do a simple read with a short timeout - read = readSocket(true,true); + read = readSocket(timeout,block); } else { if (buf.length - end < 4500) { @@ -827,7 +875,7 @@ pos = end; lastValid = pos; // Do a simple read with a short timeout - read = readSocket(true, true); + read = readSocket(timeout, block); } return read; } @@ -851,7 +899,7 @@ throws IOException { if (pos >= lastValid) { - if (!fill()) //read body + if (!fill(true,true)) //read body, must be blocking, as the thread is inside the app return -1; } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]