Author: markt
Date: Thu Sep  5 23:42:00 2013
New Revision: 1520458

URL: http://svn.apache.org/r1520458
Log:
Fix some issues with Servlet 3.1 non-blocking writes and APR identified by the 
'unit' test.

Modified:
    tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
    
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java?rev=1520458&r1=1520457&r2=1520458&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java Thu Sep  
5 23:42:00 2013
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.servlet.RequestDispatcher;
 import javax.servlet.http.HttpUpgradeHandler;
 
 import org.apache.coyote.AbstractProcessor;
@@ -587,20 +588,10 @@ public abstract class AbstractAjpProcess
                     sm.getString("ajpprocessor.comet.notsupported"));
 
         } else if (actionCode == ActionCode.AVAILABLE) {
-            if (!endOfStream) {
-                if (empty) {
-                    try {
-                        refillReadBuffer(false);
-                    } catch (IOException e) {
-                        error = true;
-                        return;
-                    }
-                }
-                if (empty) {
-                    request.setAvailable(0);
-                } else {
-                    request.setAvailable(1);
-                }
+            if (available()) {
+                request.setAvailable(1);
+            } else {
+                request.setAvailable(0);
             }
 
         } else if (actionCode == ActionCode.NB_READ_INTEREST) {
@@ -609,11 +600,12 @@ public abstract class AbstractAjpProcess
             }
 
         } else if (actionCode == ActionCode.NB_WRITE_INTEREST) {
-            // TODO
-            // Until 'true' non-blocking IO is implemented, assume it is always
-            // possible write data.
             AtomicBoolean isReady = (AtomicBoolean)param;
-            isReady.set(true);
+            boolean result = bufferedWrites.size() == 0 && responseMsgPos == 
-1;
+            isReady.set(result);
+            if (!result) {
+                registerForEvent(false, true);
+            }
 
         } else if (actionCode == ActionCode.REQUEST_BODY_FULLY_READ) {
             AtomicBoolean result = (AtomicBoolean) param;
@@ -631,6 +623,40 @@ public abstract class AbstractAjpProcess
     @Override
     public SocketState asyncDispatch(SocketStatus status) {
 
+        if (status == SocketStatus.OPEN_WRITE) {
+            try {
+                asyncStateMachine.asyncOperation();
+                try {
+                    if (hasDataToWrite()) {
+                        flushBufferedData();
+                        if (hasDataToWrite()) {
+                            // There is data to write but go via Response to
+                            // maintain a consistent view of non-blocking state
+                            response.checkRegisterForWrite(true);
+                            return SocketState.LONG;
+                        }
+                    }
+                } catch (IOException x) {
+                    if (getLog().isDebugEnabled()) {
+                        getLog().debug("Unable to write async data.",x);
+                    }
+                    status = SocketStatus.ASYNC_WRITE_ERROR;
+                    request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, x);
+                }
+            } catch (IllegalStateException x) {
+                registerForEvent(false, true);
+            }
+        } else if (status == SocketStatus.OPEN_READ &&
+                request.getReadListener() != null) {
+            try {
+                if (available()) {
+                    asyncStateMachine.asyncOperation();
+                }
+            } catch (IllegalStateException x) {
+                registerForEvent(true, false);
+            }
+        }
+
         RequestInfo rp = request.getRequestProcessor();
         try {
             rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
@@ -1491,6 +1517,25 @@ public abstract class AbstractAjpProcess
         }
     }
 
+
+    private boolean available() {
+        if (endOfStream) {
+            return false;
+        }
+        if (empty) {
+            try {
+                refillReadBuffer(false);
+            } catch (IOException e) {
+                // Not ideal. This will indicate that data is available
+                // which should trigger a read which in turn will trigger
+                // another IOException and that one can be thrown.
+                return true;
+            }
+        }
+        return !empty;
+    }
+
+
     private void writeData(ByteChunk chunk) throws IOException {
         // Prevent timeout
         socketWrapper.access();
@@ -1521,7 +1566,7 @@ public abstract class AbstractAjpProcess
 
         bytesWritten += off;
 
-        if (chunk.getLength() > 0) {
+        if (len > 0) {
             // Add this chunk to the buffer
             addToBuffers(chunk.getBuffer(), off, len);
         }
@@ -1539,6 +1584,11 @@ public abstract class AbstractAjpProcess
     }
 
 
+    private boolean hasDataToWrite() {
+        return responseMsgPos != -1 || bufferedWrites.size() > 0;
+    }
+
+
     private void flushBufferedData() throws IOException {
 
         if (responseMsgPos > -1) {

Modified: 
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java?rev=1520458&r1=1520457&r2=1520458&view=diff
==============================================================================
--- 
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
 (original)
+++ 
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
 Thu Sep  5 23:42:00 2013
@@ -96,6 +96,8 @@ public class TesterAjpNonBlockingClient 
         String resultString = result.toString();
         log.info("Client read " + resultString.length() + " bytes");
 
+        System.out.println(resultString);
+
         Assert.assertTrue(resultString.contains("00000000000000010000"));
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to