Author: markt
Date: Tue Sep  3 20:26:59 2013
New Revision: 1519838

URL: http://svn.apache.org/r1519838
Log:
When using Servlet 3.1 non-blocking reads with an AJP connector, make the reads 
between the AJP body messages from the server non-blocking. Once any part of a 
message is read, the remainder of the message will be read using blocking IO.

Added:
    
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
   (with props)
Modified:
    tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
    tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
    tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java
    tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
    tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
    tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
    tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java

Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java Tue Sep  3 
20:26:59 2013
@@ -173,4 +173,13 @@ public abstract class AbstractProcessor<
 
     @Override
     public abstract HttpUpgradeHandler getHttpUpgradeHandler();
+
+
+    /**
+     * Register the socket for the specified events.
+     *
+     * @param read  Register the socket for read events
+     * @param write Register the socket for write events
+     */
+    protected abstract void registerForEvent(boolean read, boolean write);
 }

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AbstractAjpProcessor.java Tue Sep  
3 20:26:59 2013
@@ -212,6 +212,13 @@ public abstract class AbstractAjpProcess
 
 
     /**
+     * Indicates that a 'get body chunk' message has been sent but the body
+     * chunk has not yet been received.
+     */
+    private boolean waitingForBodyMessage = false;
+
+
+    /**
      * Replay read.
      */
     protected boolean replay = false;
@@ -542,23 +549,34 @@ public abstract class AbstractAjpProcess
                     sm.getString("ajpprocessor.comet.notsupported"));
 
         } else if (actionCode == ActionCode.AVAILABLE) {
-            // Web Server only sends data when asked so unless end of stream 
has
-            // been reached, there should be data available.
-            // TODO Figure out if a 'true' non-blocking approach is possible
-            //      for AJP and what changes would be required to support it.
             if (!endOfStream) {
-                request.setAvailable(1);
+                if (empty) {
+                    try {
+                        refillReadBuffer(false);
+                    } catch (IOException e) {
+                        error = true;
+                        return;
+                    }
+                }
+                if (empty) {
+                    request.setAvailable(0);
+                } else {
+                    request.setAvailable(1);
+                }
+            }
+
+        } else if (actionCode == ActionCode.NB_READ_INTEREST) {
+            if (!endOfStream) {
+                registerForEvent(true, false);
             }
 
         } else if (actionCode == ActionCode.NB_WRITE_INTEREST) {
+            // TODO
             // Until 'true' non-blocking IO is implemented, assume it is always
             // possible write data.
             AtomicBoolean isReady = (AtomicBoolean)param;
             isReady.set(true);
 
-        } else if (actionCode == ActionCode.NB_READ_INTEREST) {
-            // NO-OP. Not required until 'true' non-blocking IO is implemented.
-
         } else if (actionCode == ActionCode.REQUEST_BODY_FULLY_READ) {
             AtomicBoolean result = (AtomicBoolean) param;
             result.set(endOfStream);
@@ -814,6 +832,7 @@ public abstract class AbstractAjpProcess
         // Recycle Request object
         first = true;
         endOfStream = false;
+        waitingForBodyMessage = false;
         empty = true;
         replay = false;
         finished = false;
@@ -866,16 +885,26 @@ public abstract class AbstractAjpProcess
             throws IOException;
 
     // Methods used by SocketInputBuffer
-    /** Receive a chunk of data. Called to implement the
-     *  'special' packet in ajp13 and to receive the data
-     *  after we send a GET_BODY packet
+    /**
+     * Read an AJP body message. Used to read both the 'special' packet in 
ajp13
+     * and to receive the data after we send a GET_BODY packet.
+     *
+     * @param block If there is no data available to read when this method is
+     *              called, should this call block until data becomes 
available?
+     *
+     * @return <code>true</code> if at least one body byte was read, otherwise
+     *         <code>false</code>
      */
-    protected boolean receive() throws IOException {
+    protected boolean receive(boolean block) throws IOException {
 
-        first = false;
         bodyMessage.reset();
 
-        readMessage(bodyMessage, true);
+        if (!readMessage(bodyMessage, block)) {
+            return false;
+        }
+
+        waitingForBodyMessage = false;
+        first = false;
 
         // No data received.
         if (bodyMessage.getLen() == 0) {
@@ -960,7 +989,7 @@ public abstract class AbstractAjpProcess
      *
      * @return true if there is more data, false if not.
      */
-    protected boolean refillReadBuffer() throws IOException {
+    protected boolean refillReadBuffer(boolean block) throws IOException {
         // If the server returns an empty packet, assume that that end of
         // the stream has been reached (yuck -- fix protocol??).
         // FORM support
@@ -972,10 +1001,13 @@ public abstract class AbstractAjpProcess
         }
 
         // Request more data immediately
-        output(getBodyMessageArray, 0, getBodyMessageArray.length);
+        if (!first && !waitingForBodyMessage) {
+            output(getBodyMessageArray, 0, getBodyMessageArray.length);
+            waitingForBodyMessage = true;
+        }
 
-        boolean moreData = receive();
-        if( !moreData ) {
+        boolean moreData = receive(block);
+        if (!first && !waitingForBodyMessage && !moreData) {
             endOfStream = true;
         }
         return moreData;
@@ -1402,8 +1434,8 @@ public abstract class AbstractAjpProcess
         finished = true;
 
         // Swallow the unread body packet if present
-        if (first && request.getContentLengthLong() > 0) {
-            receive();
+        if (first && request.getContentLengthLong() > 0 || 
waitingForBodyMessage) {
+            receive(true);
         }
 
         // Add the end message
@@ -1424,24 +1456,22 @@ public abstract class AbstractAjpProcess
      */
     protected class SocketInputBuffer implements InputBuffer {
 
-
         /**
          * Read bytes into the specified chunk.
          */
         @Override
-        public int doRead(ByteChunk chunk, Request req)
-        throws IOException {
+        public int doRead(ByteChunk chunk, Request req) throws IOException {
 
             if (endOfStream) {
                 return -1;
             }
             if (first && req.getContentLengthLong() > 0) {
                 // Handle special first-body-chunk
-                if (!receive()) {
+                if (!receive(true)) {
                     return 0;
                 }
             } else if (empty) {
-                if (!refillReadBuffer()) {
+                if (!refillReadBuffer(true)) {
                     return -1;
                 }
             }
@@ -1449,9 +1479,7 @@ public abstract class AbstractAjpProcess
             chunk.setBytes(bc.getBuffer(), bc.getStart(), bc.getLength());
             empty = true;
             return chunk.getLength();
-
         }
-
     }
 
 

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java Tue Sep  3 
20:26:59 2013
@@ -73,6 +73,12 @@ public class AjpAprProcessor extends Abs
 
 
     @Override
+    protected void registerForEvent(boolean read, boolean write) {
+        ((AprEndpoint) endpoint).getPoller().add(
+                socketWrapper.getSocket().longValue(), -1, read, write);
+    }
+
+    @Override
     protected void resetTimeouts() {
         // NO-OP. The AJP APR/native connector only uses the timeout value on
         //        time SocketWrapper for async timeouts.

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpNioProcessor.java Tue Sep  3 
20:26:59 2013
@@ -19,6 +19,7 @@ package org.apache.coyote.ajp;
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 
 import org.apache.juli.logging.Log;
@@ -57,6 +58,23 @@ public class AjpNioProcessor extends Abs
 
 
     @Override
+    protected void registerForEvent(boolean read, boolean write) {
+        final NioEndpoint.KeyAttachment attach =
+                
(NioEndpoint.KeyAttachment)socketWrapper.getSocket().getAttachment(
+                        false);
+        if (attach == null) {
+            return;
+        }
+        if (read) {
+            attach.interestOps(attach.interestOps() | SelectionKey.OP_READ);
+        }
+        if (write) {
+            attach.interestOps(attach.interestOps() | SelectionKey.OP_WRITE);
+        }
+    }
+
+
+    @Override
     protected void resetTimeouts() {
         // The NIO connector uses the timeout configured on the wrapper in the
         // poller. Therefore, it needs to be reset once asycn processing has

Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Tue Sep  3 
20:26:59 2013
@@ -70,6 +70,11 @@ public class AjpProcessor extends Abstra
 
 
     @Override
+    protected void registerForEvent(boolean read, boolean write) {
+        // NO-OP for BIO
+    }
+
+    @Override
     protected void resetTimeouts() {
         // NO-OP. The AJP BIO connector only uses the timeout value on the
         //        SocketWrapper for async timeouts.

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java Tue 
Sep  3 20:26:59 2013
@@ -1109,15 +1109,6 @@ public abstract class AbstractHttp11Proc
 
 
     /**
-     * Register the socket for the specified events.
-     *
-     * @param read  Register the socket for read events
-     * @param write Register the socket for write events
-     */
-    protected abstract void registerForEvent(boolean read, boolean write);
-
-
-    /**
      * After reading the request headers, we have to setup the request filters.
      */
     protected void prepareRequest() {

Modified: tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java?rev=1519838&r1=1519837&r2=1519838&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/spdy/SpdyProcessor.java Tue Sep  3 
20:26:59 2013
@@ -496,6 +496,11 @@ public class SpdyProcessor<S> extends Ab
         return null;
     }
 
+    @Override
+    protected void registerForEvent(boolean read, boolean write) {
+        // NO-OP
+    }
+
     public void onSynStream(SpdyStream str) throws IOException {
         this.spdyStream = str;
         SpdyFrame frame = str.reqFrame;

Added: 
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java?rev=1519838&view=auto
==============================================================================
--- 
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
 (added)
+++ 
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
 Tue Sep  3 20:26:59 2013
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.nonblocking;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.catalina.nonblocking.TestNonBlockingAPI.DataWriter;
+import org.apache.catalina.startup.TomcatBaseTest;
+import org.apache.tomcat.util.buf.ByteChunk;
+
+public class TesterAjpNonBlockingClient extends TomcatBaseTest {
+
+    /**
+     * This is not a standard unit test. This is a test client for AJP
+     * non-blocking reads. It assumes that there is an httpd instance listening
+     * on localhost:80 that is redirecting all traffic to a default Tomcat 8
+     * instance that includes the examples web application.
+     */
+    @Test
+    public void doTestAJPNonBlockingRead() throws Exception {
+
+        Map<String, List<String>> resHeaders = new HashMap<>();
+        ByteChunk out = new ByteChunk();
+        int rc = postUrl(true, new DataWriter(2000), "http://localhost"; +
+                "/examples/servlets/nonblocking/bytecounter",
+                out, resHeaders, null);
+
+        System.out.println(out.toString());
+
+        Assert.assertEquals(HttpServletResponse.SC_OK, rc);
+    }
+}

Propchange: 
tomcat/trunk/test/org/apache/catalina/nonblocking/TesterAjpNonBlockingClient.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to