Author: fhanik
Date: Wed Oct 18 16:24:52 2006
New Revision: 465417

URL: http://svn.apache.org/viewvc?view=rev&rev=465417
Log:
Implement non blocking read on HTTP requests.

A common scalability problem when it comes to HTTP is the fact that there are 
slow clients, that will block a server resources while sending a HTTP request. 
Especially when you have larger request headers.

On FreeBSD the kernel has a built in http filter to not wake up the application 
socket handle until the entire request has been received, however on other 
platforms this is not available.

With the Tomcat connectors, there is an obvious problem when it comes to slow 
clients, if the client sends up a partial request, Tomcat will block the thread 
until the client has finished sending the request. For example, if the client 
has 10 headers it sends up the first 5 headers, then the next 5 in a sequential 
batch, the tomcat thread is locked in a blocking read
I've tried to fix that problem by making the NIO connector be non blocking. The 
only time the NIO connector will block now is when the servlet asks for data, 
usually the request body, as we don't have a way to suspend a thread, like 
continuations.
Once we have continuations(that can truly remember thread stack data), we can 
have a truly non blocking server, but we are not there yet.

I believe this code could be easily ported to APR connector with very little 
effort.
When you review this code, please note that I have not attemtped to rewrite the 
header parse logic, I might do that in a later stage as this got a little 
messy, but I wanted the proof of concept done first and reuse as much code as 
possible.

Please feel free to review and even flame me if needed, at least that means 
this got some attention :)


Modified:
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
    
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?view=diff&rev=465417&r1=465416&r2=465417
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java 
Wed Oct 18 16:24:52 2006
@@ -820,7 +820,7 @@
 
         boolean keptAlive = false;
         boolean openSocket = false;
-
+        boolean recycle = true;
         while (!error && keepAlive && !comet) {
 
             // Parsing the request header
@@ -829,8 +829,7 @@
                     
socket.getIOChannel().socket().setSoTimeout((int)soTimeout);
                     inputBuffer.readTimeout = soTimeout;
                 }
-                if (!inputBuffer.parseRequestLine
-                        (keptAlive && (endpoint.getCurrentThreadsBusy() > 
limit))) {
+                if (!inputBuffer.parseRequestLine(keptAlive && 
(endpoint.getCurrentThreadsBusy() > limit))) {
                     // This means that no data is available right now
                     // (long keepalive), so that the processor should be 
recycled
                     // and the method should return true
@@ -839,13 +838,18 @@
                     socket.getPoller().add(socket);
                     break;
                 }
-                request.setStartTime(System.currentTimeMillis());
                 keptAlive = true;
-                if (!disableUploadTimeout) {
+                if ( !inputBuffer.parseHeaders() ) {
+                    openSocket = true;
+                    socket.getPoller().add(socket);
+                    recycle = false;
+                    break;
+                }
+                request.setStartTime(System.currentTimeMillis());
+                if (!disableUploadTimeout) { //only for body, not for request 
headers
                     socket.getIOChannel().socket().setSoTimeout((int)timeout);
                     inputBuffer.readTimeout = soTimeout;
                 }
-                inputBuffer.parseHeaders();
             } catch (IOException e) {
                 error = true;
                 break;
@@ -934,7 +938,7 @@
                 return SocketState.LONG;
             }
         } else {
-            recycle();
+            if ( recycle ) recycle();
             return (openSocket) ? SocketState.OPEN : SocketState.CLOSED;
         }
 

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?view=diff&rev=465417&r1=465416&r2=465417
==============================================================================
--- 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java 
(original)
+++ 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java 
Wed Oct 18 16:24:52 2006
@@ -45,9 +45,10 @@
 
     // -------------------------------------------------------------- Constants
 
-
+    enum HeaderParseStatus {DONE, HAVE_MORE_HEADERS, NEED_MORE_DATA}
+    enum HeaderParsePosition {HEADER_START, HEADER_NAME, HEADER_VALUE, 
HEADER_MULTI_LINE}
     // ----------------------------------------------------------- Constructors
-
+    
 
     /**
      * Alternate constructor.
@@ -72,6 +73,9 @@
         lastActiveFilter = -1;
 
         parsingHeader = true;
+        parsingRequestLine = true;
+        headerParsePos = HeaderParsePosition.HEADER_START;
+        headerData.recycle();
         swallowInput = true;
 
         if (readTimeout < 0) {
@@ -112,6 +116,8 @@
      * State.
      */
     protected boolean parsingHeader;
+    protected boolean parsingRequestLine;
+    protected HeaderParsePosition headerParsePos;
 
 
     /**
@@ -286,6 +292,9 @@
         pos = 0;
         lastActiveFilter = -1;
         parsingHeader = true;
+        headerParsePos = HeaderParsePosition.HEADER_START;
+        parsingRequestLine = true;
+        headerData.recycle();
         swallowInput = true;
 
     }
@@ -325,6 +334,9 @@
         pos = 0;
         lastActiveFilter = -1;
         parsingHeader = true;
+        headerParsePos = HeaderParsePosition.HEADER_START;
+        parsingRequestLine = true;
+        headerData.recycle();
         swallowInput = true;
 
     }
@@ -360,6 +372,9 @@
     public boolean parseRequestLine(boolean useAvailableData)
         throws IOException {
 
+        //check state
+        if ( !parsingRequestLine ) return true;
+        
         int start = 0;
 
         //
@@ -375,7 +390,7 @@
                     return false;
                 }
                 if (readTimeout == -1) {
-                    if (!fill()) //request line parsing
+                    if (!fill(false,true)) //request line parsing
                         throw new EOFException(sm.getString("iib.eof.error"));
                 } else {
                     // Do a simple read with a short timeout
@@ -397,8 +412,8 @@
                 return false;
             }
             if (readTimeout == -1) {
-                if (!fill()) //request line parsing
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(false,false)) //request line parsing
+                    return false;
             } else {
                 // Do a simple read with a short timeout
                 if ( !readSocket(true, false) ) return false;
@@ -416,8 +431,8 @@
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //request line parsing
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) //request line parsing
+                    return false;
             }
 
             if (buf[pos] == Constants.SP) {
@@ -445,8 +460,8 @@
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //request line parsing
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) //request line parsing
+                    return false;
             }
 
             if (buf[pos] == Constants.SP) {
@@ -489,8 +504,8 @@
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //reques line parsing
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) //reques line parsing
+                    return false;
             }
 
             if (buf[pos] == Constants.CR) {
@@ -510,7 +525,7 @@
         } else {
             request.protocol().setString("");
         }
-
+        parsingRequestLine = false;
         return true;
 
     }
@@ -552,7 +567,7 @@
             } else if ( !block ) {
                 return false;
             } else {
-                timedOut = (readTimeout != -1) && 
((System.currentTimeMillis()-start)>readTimeout);
+                timedOut = timeout && (readTimeout != -1) && 
((System.currentTimeMillis()-start)>readTimeout);
                 if ( !timedOut && nRead == 0 )  {
                     try {
                         final SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
@@ -604,15 +619,20 @@
     /**
      * Parse the HTTP headers.
      */
-    public void parseHeaders()
+    public boolean parseHeaders()
         throws IOException {
-
-        while (parseHeader()) {
+        HeaderParseStatus status = HeaderParseStatus.HAVE_MORE_HEADERS;
+        
+        do {
+            status = parseHeader();
+        } while ( status == HeaderParseStatus.HAVE_MORE_HEADERS );
+        if (status == HeaderParseStatus.DONE) {
+            parsingHeader = false;
+            end = pos;
+            return true;
+        } else {
+            return false;
         }
-
-        parsingHeader = false;
-        end = pos;
-
     }
 
 
@@ -622,7 +642,7 @@
      * @return false after reading a blank line (which indicates that the
      * HTTP header parsing is done
      */
-    public boolean parseHeader()
+    public HeaderParseStatus parseHeader()
         throws IOException {
 
         //
@@ -630,12 +650,14 @@
         //
 
         byte chr = 0;
-        while (true) {
+        while (headerParsePos == HeaderParsePosition.HEADER_START) {
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //parse header
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) {//parse header 
+                    headerParsePos = HeaderParsePosition.HEADER_START;
+                    return HeaderParseStatus.NEED_MORE_DATA;
+                }
             }
 
             chr = buf[pos];
@@ -643,7 +665,7 @@
             if ((chr == Constants.CR) || (chr == Constants.LF)) {
                 if (chr == Constants.LF) {
                     pos++;
-                    return false;
+                    return HeaderParseStatus.DONE;
                 }
             } else {
                 break;
@@ -653,28 +675,31 @@
 
         }
 
-        // Mark the current buffer position
-        int start = pos;
+        if ( headerParsePos == HeaderParsePosition.HEADER_START ) {
+            // Mark the current buffer position
+            headerData.start = pos;
+            headerParsePos = HeaderParsePosition.HEADER_NAME;
+        }    
 
         //
         // Reading the header name
         // Header name is always US-ASCII
         //
+        
+        
 
-        boolean colon = false;
-        MessageBytes headerValue = null;
-
-        while (!colon) {
+        while (headerParsePos == HeaderParsePosition.HEADER_NAME) {
 
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //parse header
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) { //parse header 
+                    return HeaderParseStatus.NEED_MORE_DATA;
+                }
             }
 
             if (buf[pos] == Constants.COLON) {
-                colon = true;
-                headerValue = headers.addValue(buf, start, pos - start);
+                headerParsePos = HeaderParsePosition.HEADER_VALUE;
+                headerData.headerValue = headers.addValue(buf, 
headerData.start, pos - headerData.start);
             }
             chr = buf[pos];
             if ((chr >= Constants.A) && (chr <= Constants.Z)) {
@@ -682,97 +707,121 @@
             }
 
             pos++;
-
+            if ( headerParsePos == HeaderParsePosition.HEADER_VALUE ) { 
+                // Mark the current buffer position
+                headerData.start = pos;
+                headerData.realPos = pos;
+            }
         }
 
-        // Mark the current buffer position
-        start = pos;
-        int realPos = pos;
-
+        
         //
         // Reading the header value (which can be spanned over multiple lines)
         //
 
         boolean eol = false;
-        boolean validLine = true;
 
-        while (validLine) {
+        while (headerParsePos == HeaderParsePosition.HEADER_VALUE ||
+               headerParsePos == HeaderParsePosition.HEADER_MULTI_LINE) {
+            if ( headerParsePos == HeaderParsePosition.HEADER_VALUE ) {
+            
+                boolean space = true;
 
-            boolean space = true;
+                // Skipping spaces
+                while (space) {
 
-            // Skipping spaces
-            while (space) {
+                    // Read new bytes if needed
+                    if (pos >= lastValid) {
+                        if (!fill(true,false)) {//parse header 
+                            //HEADER_VALUE, should already be set
+                            return HeaderParseStatus.NEED_MORE_DATA;
+                        }
+                    }
 
-                // Read new bytes if needed
-                if (pos >= lastValid) {
-                    if (!fill()) //parse header
-                        throw new EOFException(sm.getString("iib.eof.error"));
-                }
+                    if ((buf[pos] == Constants.SP) || (buf[pos] == 
Constants.HT)) {
+                        pos++;
+                    } else {
+                        space = false;
+                    }
 
-                if ((buf[pos] == Constants.SP) || (buf[pos] == Constants.HT)) {
-                    pos++;
-                } else {
-                    space = false;
                 }
 
-            }
-
-            int lastSignificantChar = realPos;
+                headerData.lastSignificantChar = headerData.realPos;
 
-            // Reading bytes until the end of the line
-            while (!eol) {
+                // Reading bytes until the end of the line
+                while (!eol) {
 
-                // Read new bytes if needed
-                if (pos >= lastValid) {
-                    if (!fill()) //parse header
-                        throw new EOFException(sm.getString("iib.eof.error"));
-                }
+                    // Read new bytes if needed
+                    if (pos >= lastValid) {
+                        if (!fill(true,false)) {//parse header 
+                            //HEADER_VALUE
+                            return HeaderParseStatus.NEED_MORE_DATA;
+                        }
 
-                if (buf[pos] == Constants.CR) {
-                } else if (buf[pos] == Constants.LF) {
-                    eol = true;
-                } else if (buf[pos] == Constants.SP) {
-                    buf[realPos] = buf[pos];
-                    realPos++;
-                } else {
-                    buf[realPos] = buf[pos];
-                    realPos++;
-                    lastSignificantChar = realPos;
-                }
+                    }
 
-                pos++;
+                    if (buf[pos] == Constants.CR) {
+                    } else if (buf[pos] == Constants.LF) {
+                        eol = true;
+                    } else if (buf[pos] == Constants.SP) {
+                        buf[headerData.realPos] = buf[pos];
+                        headerData.realPos++;
+                    } else {
+                        buf[headerData.realPos] = buf[pos];
+                        headerData.realPos++;
+                        headerData.lastSignificantChar = headerData.realPos;
+                    }
 
-            }
+                    pos++;
 
-            realPos = lastSignificantChar;
+                }
 
-            // Checking the first character of the new line. If the character
-            // is a LWS, then it's a multiline header
+                headerData.realPos = headerData.lastSignificantChar;
 
+                // Checking the first character of the new line. If the 
character
+                // is a LWS, then it's a multiline header
+                headerParsePos = HeaderParsePosition.HEADER_MULTI_LINE;
+            }
             // Read new bytes if needed
             if (pos >= lastValid) {
-                if (!fill()) //parse header
-                    throw new EOFException(sm.getString("iib.eof.error"));
+                if (!fill(true,false)) {//parse header
+                    
+                    //HEADER_MULTI_LINE
+                    return HeaderParseStatus.NEED_MORE_DATA;
+                }
             }
 
             chr = buf[pos];
-            if ((chr != Constants.SP) && (chr != Constants.HT)) {
-                validLine = false;
-            } else {
-                eol = false;
-                // Copying one extra space in the buffer (since there must
-                // be at least one space inserted between the lines)
-                buf[realPos] = chr;
-                realPos++;
+            if ( headerParsePos == HeaderParsePosition.HEADER_MULTI_LINE ) {
+                if ( (chr != Constants.SP) && (chr != Constants.HT)) {
+                    headerParsePos = HeaderParsePosition.HEADER_START;
+                } else {
+                    eol = false;
+                    // Copying one extra space in the buffer (since there must
+                    // be at least one space inserted between the lines)
+                    buf[headerData.realPos] = chr;
+                    headerData.realPos++;
+                }
             }
-
         }
-
         // Set the header value
-        headerValue.setBytes(buf, start, realPos - start);
-
-        return true;
-
+        headerData.headerValue.setBytes(buf, headerData.start, 
headerData.realPos - headerData.start);
+        headerData.recycle();
+        return HeaderParseStatus.HAVE_MORE_HEADERS;
+    }
+    
+    protected HeaderParseData headerData = new HeaderParseData();
+    public static class HeaderParseData {
+        int start = 0;
+        int realPos = 0;
+        int lastSignificantChar = 0;
+        MessageBytes headerValue = null;
+        public void recycle() {
+            start = 0;
+            realPos = 0;
+            lastSignificantChar = 0;
+            headerValue = null;
+        }
     }
 
 
@@ -795,14 +844,13 @@
 
     // ------------------------------------------------------ Protected Methods
 
-
     /**
      * Fill the internal buffer using data from the undelying input stream.
      * 
      * @return false if at end of stream
      */
-    protected boolean fill()
-        throws IOException {
+    protected boolean fill(boolean timeout, boolean block)
+        throws IOException, EOFException {
 
         boolean read = false;
 
@@ -814,7 +862,7 @@
             }
 
             // Do a simple read with a short timeout
-            read = readSocket(true,true);
+            read = readSocket(timeout,block);
         } else {
 
             if (buf.length - end < 4500) {
@@ -827,7 +875,7 @@
             pos = end;
             lastValid = pos;
             // Do a simple read with a short timeout
-            read = readSocket(true, true);
+            read = readSocket(timeout, block);
         }
         return read;
     }
@@ -851,7 +899,7 @@
             throws IOException {
 
             if (pos >= lastValid) {
-                if (!fill()) //read body
+                if (!fill(true,true)) //read body, must be blocking, as the 
thread is inside the app
                     return -1;
             }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to